Как стать автором
Обновить

Python Multiprocessing. Обмен данными между процессами. Передача объектов пользовательских классов

Уровень сложности Сложный
Время на прочтение 15 мин
Количество просмотров 1.5K

Параллельное программирование — сложный, но очень полезный навык для программиста. Оно позволяет эффективно использовать мощности современных компьютеров с несколькими ядрами и процессорами. Это особенно важно при решении сложных задач, например, в инженерных расчетах, обработке мультимедийных данных, обучении нейросетей и многом другом.

Чтобы не перегружать статью, я не буду подробно писать про параллелизм на Python, а рассмотрю только узкую тему обмена данными в мультипроцессных программах. Для того, чтобы разобраться в этом, вам понадобятся базовые навыки работы с модулем Multiprocessing.

Если же вы еще не погружались в тему параллельного программирования и программирования на Python в общем, то вот несколько книг, которые вам в этом помогут:

Модуль Multiprocessing позволяет использовать так называемый истинный параллелизм, то есть создавать процессы, которые выполняются полностью независимо друг от друга. Про отличие от мнимого параллелизма и особенности модуля Threading можно почитать, например, по этой ссылке.

В этом случае процессы не имеют общей памяти и не могут просто так читать и изменять одни и те же переменные. По возможности рекомендуется и вовсе избегать межпроцессного обмена данными, но большинство сложных задач без этого не решаются, или решение слишком сложное.

Конечно же, в модуле multiprocessing реализован нативный способ передавать данные между процессами, и даже не один. Однако как только мы отходим от встроенных типов данных, то готовые решения уже не работают. О том, как с этим обходиться, я и расскажу в этой статье. Будем двигаться от простых примеров к сложным. Все примеры кода из этой статьи можно найти в этом проекте google collab.

Передача данных при создании процесса

При создании процесса, выполняющего функцию (target), можно передать параметры этой функции. Все объекты передаются в процесс «по значению», то есть для процесса создается копия переданного объекта. Изменения копии объекта внутри процесса не влияют на исходный объект. Такой подход можно использовать, если вы планируете только один раз скопировать объекты в дочерний процесс и дальше не следить за их изменениями. Однако следует быть очень внимательными, так как передача объектов по значению не согласуется с привычным поведением переменных изменяемых типов данных в Python (списки, словари, множества). Продемонстрируем разницу на следующем примере:

import multiprocessing as mp

def demoFunction(targetList,newElem,comment):
    targetList.append(newElem)
    print(comment, 'Inside demoFunction: ', targetList)


if __name__ == '__main__':
    myList = [1,2,3]
    demoFunction(myList, 4, 'Plain.')# вызовем функцию в основном процессе
    print('After "demoFunction" without multiprocessing', myList)

    # создадим процесс, который ответвляется от основного и выполнит функцию demoFunction 
    p = mp.Process(target=demoFunction, args=[myList,5,'Multiprocessing.']) 
   

    p.start()# запускаем процесс
    p.join() # дожидаемся завершения процесса

    print('After "demoFunction" with multiprocessing', myList)

Результат:

Plain. Inside demoFunction:  [1, 2, 3, 4]
After "demoFunction" without multiprocessing [1, 2, 3, 4]
Multiprocessing. Inside demoFunction:  [1, 2, 3, 4, 5]
After "demoFunction" with multiprocessing [1, 2, 3, 4]

Сначала функция demoFunction выполнилась в основном процессе и никакой межрпроцессной передачи данных не происходило. Список, переданный в функцию, был передан по ссылке, так как список относится к изменяемым типам данных. Поэтому после выполнения функции изменения произошли и со списком myList.

Затем та же функция выполнилась в дочернем процессе p, в котором была создана копия списка myList. Копия списка myList изменилась внутри дочернего процесса, однако на список myList в основном процессе это не повлияло.

Односторонняя передача данных в процесс при его создании работает просто и понятно, но не годится для случаев, когда несколько процессов должны изменять и читать общие данные в течение своей работы. Для этого используются другие инструменты.

Shared Memory

Этот способ я привожу только для полноты картины, далее в статье мы использовать его не будем, поэтому я позволю себе просто привести перевод документации.

Данные можно хранить в разделяемой памяти, используя объекты Value или Array.

import  multiprocessing as mp

def sharedMemDemo(targetNum, targetArr):
    targetNum.value = 3.1415927
    for i in range(len(targetArr)):
        targetArr[i] = -targetArr[i]

if __name__ == '__main__':
    myNum = mp.Value('d', 0.0)
    myArr = mp.Array('i', range(10))

    p = mp.Process(target=sharedMemDemo, args=(myNum, myArr))
    p.start()# запускаем процесс
    p.join() # дожидаемся завершения процесса

    print(myNum.value)
    print(myArr[:]) # срез от mp.Array дает тип list, который удобно печатается

Результат:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументы 'd' и 'i', используемые при создании myNum и myArr, являются кодами типов. Эти объекты потоко‑ и процессо‑безопасны.

Для более гибкой работы с общей памятью можно использовать модуль multiprocessing.sharedctypes, который поддерживает создание произвольных объектов ctypes, выделенных из общей памяти.

Server process. SyncManager

SyncManager позволяет синхронизировать между процессами более сложные структуры данных, но этих структур не очень много. Их можно разделить на средства синхронизации и непосредственно структуры данных:

Средства синхронизации:

Структуры данных:

- Semaphore
- BoundedSemaphore
- Condition
- Event
- Lock
- RLock

- Namespace
- Queue
- Array
- Value
- dict
- list

При передаче данных между процессами данные подвергаются сначала сериализации, а потом десериализации с помощью pickle, поэтому содержимым списка, очереди, массива и словаря могут быть только структуры данных, которые pickable.

Пример использования:

import multiprocessing as mp

def demoFunc(targetDict, targetList):
    targetDict[1] = '1'
    targetDict['2'] = 2
    targetDict[0.25] = None
    targetList.reverse()

if __name__ == '__main__':
    with mp.Manager() as manager:
        myDict = manager.dict()
        myList = manager.list(range(10))

        p = mp.Process(target=demoFunc, args=(myDict, myList))
        p.start()
        p.join()

        print(myDict)
        print(myList)

Результат:

{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Этот вариант синхронизации данных более функционален: менеджер может использоваться даже процессами на разных компьютерах, — но в простых случаях лучше использовать shared memory, так как она работает быстрее.

Если в программе используются только встроенные типы данных, то функциональности multiprocessing хватит с головой. К тому же передаваемые через средства multiprocessing объекты защищены от конфликтов совместного доступа к разделяемой памяти. Стоит заметить, что это верно только для атомарных стандартных операций. Если вам важно, чтобы процесс имел эксклюзивный доступ к объекту на время выполнения составных операций, то соответствующую область кода необходимо защищать самостоятельно. Здесь на помощь придут Lock, Event и т. д.

Но что делать, если в программе используются пользовательские классы или объекты из других модулей? Решение для таких случаев можно построить на базе SyncManager. Как именно, я расскажу далее, но для этого надо сначала углубиться в механизм его работы.

Proxy-объекты

Менеджер создает служебный процесс (так называемый серверный процесс), в котором размещается централизованная версия разделяемого объекта. Процессы не имеют прямого доступа к этому объекту. Менеджер создает так называемый прокси‑объект для каждого процесса, и процессы обращаются именно к прокси‑объекту.

Прокси‑объекты используются в коде программы точно так же, как исходные объекты, и логика их работы скрыта от программиста. Они сериализуют данные, синхронизируют и координируют изменения, вносимые процессами, с централизованной версией объекта, размещенной в менеджере.

Замечание:
Важно помнить, что для прокси‑объектов не поддерживается сравнение по значению. Например,

>>> manager.list([1,2,3]) == [1,2,3]
False

Для сравнения необходимо использовать копии прокси‑объекта.

>>> manager.list([1,2,3])[:] == [1,2,3]
True

При сборке мусора прокси‑объект «отменяет свою регистрацию» в менеджере, который владеет разделяемым объектом. Разделяемый объект удаляется из процесса менеджера, когда больше нет прокси‑объектов, ссылающихся на него.

Пользовательские классы

Наконец перейдем к основной теме статьи — межпроцессному обмену пользовательскими классами. Сначала рассмотрим пример класса, который содержит только поля встроенных типов данных и функции.

class MyAwesomeClass:
    def __init__(self):
        self.x = 100
    def __str__(self):
        return str(self.x)
    def change(self):
        self.x = 200

Передадим этот объект в дочерний процесс как аргумент функции. За счет того, что все поля и функции в этом классе pickable, мы можем сделать это без каких‑либо дополнительных действий. В функции будем изменять поле класса тремя способами: прибавлять число, присваивать новое значение и присваивать новое значение с помощью функции класса.

import multiprocessing as mp 
 		
def changer(obj):
     obj.x = 500
     print('Hello! ',obj)
     obj.change()
     print('Hello! ',obj)
     obj.x += 1
     print('Hello! ',obj)

if __name__ == '__main__':
    MAC = MyAwesomeClass()						
    p = mp.Process(target = changer, args=(MAC,))
    p.start()
    p.join()		
    print('Here I am: ', MAC)

Результат:

Hello! 500
Hello! 200
Hello! 201
Here I am: 100

Как мы видим, все три способа изменения сработали, однако повлияли только на копию объекта в дочернем процессе, но не на исходный объект. В этом случае механизм такой же, как и рассмотренный нами выше для встроенных типов данных.

Передадим этот объект в дочерний процесс с помощью списка SyncManager. Перепишем функцию foo, чтобы она работала с первым элементом переданного списка. Снова будет изменять значение поля x тремя способами.

def changer(obj):
     obj[0].x = 500
     print('Hello! ',obj[0])
     obj[0].change()
     print('Hello! ',obj[0])
     obj[0].x += 1
     print('Hello! ',obj[0])	
					
if __name__ == '__main__':
    manager = mp.Manager()
    sharedList = manager.list()
    MAC = MyAwesomeClass()
    sharedList.append(MAC)
    p = mp.Process(target = changer, args=(sharedList,))
    p.start()
    p.join()					
    print('Here I am: ', sharedList[0])

Результат:

Hello! 100
Hello! 100
Hello! 100
Here I am: 100

В данном случае изменения не только не затронули исходный объект, но и не отразились даже в переданном списке. Это произошло потому, что у прокси-объектов по умолчанию отсутствует механизм оповещения об изменениях во вложенных структурах. Это можно обойти, изменив сам список, а не его содержимое, например, таким образом

def changer(obj):
    tmp = obj[0]# скопируем объект во временную переменную
    tmp.x = 500 # изменим временную переменную
    obj[0] = tmp # присвоим элементу списка значение временной переменной, это вызовет оповещение списка об изменениях
    print('Hello! ',obj[0])

Результат:

Hello! 500
Here I am: 500 

Такой способ вполне можно использовать, однако не стоит забывать про особенности копирования объектов пользовательских классов.

Класс с не pickable полями

Дополним рассмотренный ранее класс функцией‑генератором.

def generatorFunc():
    for i in range(10):
        yield i*i

class MyAwesomeClass:
  
  def __init__(self):
      self.x = 100
      self.gen = generatorFunc()
  
  def __str__(self):
      return str(self.x)
  
  def change(self):
  		self.x = 200

Теперь, если мы попробуем передать объект этого класса в дочерний процесс любым из описанных выше способов, мы получим ошибку
TypeError: cannot pickle 'generator' object

В некоторых случаях наиболее простым вариантом будет реализовать pickle сериализацию для пользовательского класса, но чаще всего  это слишком трудозатратно. К тому же pickle сериализация небезопасна.

К счастью, есть другой способ. 

Имплементация пользовательского прокси и менеджера

Чтобы мы могли работать с пользовательским классом по тому же принципу, что и со встроенными типами, нам необходимо создать соответствующие прокси‑классы и свой собственный менеджер.

От менеджера нам не надо ничего особенного, достаточно наследовать его от базового менеджера. Можно конечно использовать и сам базовый менеджер, но правила хорошего тона предписывают нам не влезать в стандартные классы чужих модулей. Поэтому мы создадим свой собственный класс:

from multiprocessing.managers import BaseManager

class MyManager(BaseManager):
    pass # да, и это все

Далее возьмем определение MyAwesomeClass (пока без генератора) и посмотрим подробнее, что нам нужно

class MyAwesomeClass:

    def __init__(self):				
        self.x = 100			

    def __str__(self):
        return str(self.x)
    
    def change(self):
		self.x = 200

x — публичный атрибут класса. Мы хотим, чтобы можно было его читать и изменять. Этот механизм уже реализован в классе multiprocessing.managers.NamespaceProxy, поэтому возьмем его в качестве родительского класса.

from multiprocessing.managers import NamespaceProxy

class MyAwesomeClassProxy(NamespaceProxy):
  pass # да, и тут тоже больше ничего не надо

По умолчанию  NamespaceProxy позволяет нам обращаться ко всем публичным функциям и читать и изменять все публичные поля класса.  Найти внятного описания в документации у меня не вышло, поэтому пришлось залезть в код и посмотреть, как это работает. Далее в статье я приведу кусок этого кода, но пока просто примите это на веру.

Теперь мы можем зарегистрировать класс в менеджере и использовать зарегистрированный тип для создания объектов

MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)

M = MyManager()
M.start()
MAC = M.MyAwesomeClassRegistred()

Подробнее  о методе register можно прочитать в документации. Нам будут важны первые четыре параметра.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

  • typeid  - это "идентификатор типа", который будет использован в менеджере

  • callable - это вызываемый объект, используемый для создания объектов для этого идентификатора типа. В данном конкретном случае – имя класса.

  • proxytype - это подкласс BaseProxy, который используется для создания прокси-объектов для общих объектов с этим идентификатором типа. Если None, то класс прокси создается автоматически (AutoProxy).

  • exposed  используется для указания последовательности имен методов исходного класса, к которым прокси-объект должны иметь доступ с использованием BaseProxy.callmethod(). (Если exposed равен None, то вместо него используется proxytype.exposed, если он существует.) В случае, если список exposed не указан, для прокси-объекта будут доступны все публичные методы исходного класса. 

Вот как выглядит решение с прокси-классом целиком

import multiprocessing as mp
from multiprocessing.managers import NamespaceProxy, BaseManager

class MyAwesomeClass:
    def __init__(self):
         self.x = 100
    def __str__(self):
        return str(self.x)

def changer(obj):
    obj.x = 200
    print('Hello! ',obj.x)

class MyManager(BaseManager):
    pass

class MyAwesomeClassProxy(NamespaceProxy):
    pass

MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)

if __name__ == '__main__':
    M = MyManager()
    M.start()
    MAC = M.MyAwesomeClassRegistred()
    p = mp.Process(target = changer, args=(MAC,))
    p.start()
    p.join()
    print('Here I am: ', MAC.x)

Результат:

Hello! 200
Here I am: 200 

Создание собственного прокси‑класса сделало код более читаемым. Теперь объект передается и меняется в явном виде без дополнительных оберток и переприсваиваний.

Давайте подробнее изучим, как устроен прокси‑класс. Реализация NamespaceProxy уже достаточно сложная, так что чтобы разобраться, как работает вызов функций и обращение к полям класса, давайте сначала поработаем с BaseProxy.

Разберем несколько примеров с exposed и различными способами вызова функций.

import multiprocessing as mp 
from multiprocessing.managers import BaseProxy, BaseManager


class MyManager(BaseManager):
    pass # да, и это все

 		
class MyAwesomeClass:
    def __init__(self):
        self.x = 100

    def awesome_func(self):
        print("Hello! I'm MyAwesomeClass. It's awesome_func")
    def idle_func(self):
        print("Hello! I'm MyAwesomeClass. It's idle_func")
    def exposed_func(self):
        print("Hello! I'm MyAwesomeClass. It's exposed_func")
    def hidden_func(self):
        print("Hello! I'm MyAwesomeClass. It's hidden_func")


class MyAwesomeClassProxy(BaseProxy):
    _exposed_ = ('exposed_func', )

    def exposed_func(self):
        print('First we hit MyAwesomeClassProxy ')
        self._callmethod('exposed_func')

    def idle_func(self):
        print('We are in MyAwesomeClassProxy, idle_func')

    def hidden_func_proxy(self):
        self._callmethod('hidden_func')

    def hidden_func_proxy2(self):
        self.hidden_func()

def allCaller(obj):
    #obj.awesome_func() 
    #obj.idle_func()
    obj.exposed_func()
    #obj.hidden_func_proxy()
    #obj.hidden_func_proxy2()


MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)

if __name__ == '__main__':
    M = MyManager()
    M.start()
    MAC = M.MyAwesomeClassRegistred()
    allCaller(MAC)

Теперь, если мы передадим объект obj класса MyAwesomeClass в функцию allCaller и будем по очереди раскомментировать строки и запускать интерпретатор, то получим следующий результат.

>>obj.awesome_func() 

AttributeError: 'MyAwesomeClassProxy' object has no attribute 'awesome_func'

Функция awesome_func была объявлена только в исходном классе, никакого упоминания о ней нет в прокси-классе. Когда мы обращаемся в переданному в процесс объекту, мы на самом деле обращаемся к его прокси‑объекту, который имеет уже другой тип — не MyAwesomeClass, а MyAwesomeClassProxy. Соответственно, нам доступны только поля и функции прокси‑класса. Именно поэтому мы получаем AttributeError.

>>obj.idle_func()

We are in MyAwesomeClassProxy, idle_func

Функция idle_func объявлена в обоих классах. Но так как мы обращаемся к прокси-объекту, то активируется idle_func прокси-класса. До idle_func в MyAwesomeClass мы таким образом добраться не можем.

>>obj.exposed_func()

First we hitMyAwesomeClassProxy 
Hello! I'm MyAwesomeClass. It's exposed_fun

Функция exposed_func добавлена в кортеж _exposed_ прокси-класса, поэтому из прокси-класса мы можем обратиться к exposed_func изначального класса с помощью метода _callmethod. Так как это приватный метод прокси-класса, нам нужно сделать функцию-обертку, которую мы и будем использовать в основном коде.

>>obj.hidden_func_proxy()

AttributeError: method 'hidden_func' of <class 'mp_main.MyAwesomeClass'> object is not in exposed={'exposed_func'}

Если метод не добавлен в кортеж exposed, то _callmethod вызовет исключение, как в случае с hidden_func_proxy.

>>obj.hidden_func_proxy2()

AttributeError: 'MyAwesomeClassProxy' object has no attribute 'hidden_func'

И наконец в последнем примере происходит такая же ошибка, как в первом. Self указывает на объект прокси-класса, у которого не определена функция hidden_func.

Такой же принцип действует и для полей класса, ведь на самом деле, встречая код obj.x, интерпретатор вызывает встроенную функцию класса
getattribute(self, «x»). Соответственно, необходимо написать соответствующую функцию для прокси‑класса, что и реализовано в NamespaceProxy:

_exposed_ = (‘__getattribute__’, )

# здесь идет работа и с методами, и с полями класса
def __getattr__(self, key):
  if key[0] == '_':
    return object.__getattribute__(self, key)
  callmethod = object.__getattribute__(self, '_callmethod')
  return callmethod('__getattribute__', (key,))

Класс со вложенным генератором  

Вернемся к примеру со вложенным генератором. К сожалению, генератор не является pickable объектом, поэтому мы не сможем его передать через Manager. При этом нам важно, чтобы состояние генератора было согласовано между процессами, чтобы получать корректную последовательность значений, значит, вариант с копией нам тоже не подходит. Один из вариантов решения этой проблемы – обернуть вызов генератора в функцию класса. 

from multiprocessing import Manager, Process, Lock
from multiprocessing.managers import BaseManager, NamespaceProxy, BaseProxy, MakeProxyType
import time


def baz():
   for i in range(10):
       yield i*i
class MyAwesomeClass:
   def __init__(self):
       self.x = 100
       self.gen = baz()
   def __str__(self):
       return str(self.x)
   def next(self):
       return self.gen.__next__()


def demoFunction(obj, num, ConsoleLock):
   for i in range(4):
       ConsoleLock.acquire() # захватываем замок, чтобы второй процесс не вмешался в вывод текста в консоль
       print(num, 'Gen: ', obj.next())
       print(num, 'Proxy ownVar = ', obj.ownVar)
       ConsoleLock.release() # освободили консоль – освободили замок
       time.sleep(1)




class MyManager(BaseManager):
   pass


class MyAwesomeClassProxy(NamespaceProxy):
   _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'next')


   def __init__(self, *args, **kwargs):
       super().__init__(*args,**kwargs)
       self.ownVar = 10


   def next(self):
       self.ownVar += 1
       return self._callmethod('next')


MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)


if __name__ == '__main__':
   M = MyManager()
   M.start()
   MAC = M.MyAwesomeClassRegistred()

   # замок нам понадобится, чтобы процессы не конфликтовали 
   # за консоль при печати, 
   # а не то вывод перемешается и будет ничего непонятно
   ConsoleLock = Lock() 

   p = Process(target = demoFunction, args=(MAC,1, ConsoleLock))
   p2 = Process(target = demoFunction, args=(MAC,2,ConsoleLock))
   p.start()
   p2.start()
   p.join()
   p2.join()

Результат:

1 Gen:  0
1 Proxy ownVar =  11
2 Gen:  1
2 Proxy ownVar =  12
1 Gen:  4
1 Proxy ownVar =  13
2 Gen:  9
2 Proxy ownVar =  14
1 Gen:  16
1 Proxy ownVar =  15
2 Gen:  25
2 Proxy ownVar =  16
1 Gen:  36
1 Proxy ownVar =  17
2 Gen:  49
2 Proxy ownVar =  18

Здесь важно обратить внимание на работу с генератором. Обращение к генератору и методу next() происходит только внутри исходного класса. Прокси‑класс вызывает метод next() исходного класса (поэтому название этого метода добавлено в exposed). То есть объект‑генератор не передается и не копируется в прокси‑объект. Мы видим, что его состояние меняется обоими процессами, т. е. оба процесса используют один и тот же генератор.

Однако у каждого процесса есть свой собственный прокси‑объект, к которому они и обращаются. Это видно на примере поля ownVar  — собственного поля прокси‑класса, которое отсутствует у исходного класса. Поле ownVar меняется у прокси‑объектов независимо друг от друга.

Подводим итоги

Передача данных между процессами замедляет работу программы в целом и кроме этого усложняет логику. В ситуациях, когда невозможно полностью избежать передачи данных, лучше ограничиться встроенными типами данных и механизмом Shared Memory. Применение собственных Proxy и Manager следует рассматривать только в случаях, когда других вариантов не осталось.

Написание и отладка собственного прокси‑класса с более или менее сложной структурой являются довольно трудоемкой задачей. Вложенные структуры данных могут вызывать проблемы, и нужно очень внимательно следить за тем, какие объекты изменяются в процессах, чтобы прописать соответствующую логику в прокси‑классе. Будьте готовы к тому, что простого решения не получится. Бояться, разумеется, этого тоже не стоит, просто надо трезво оценивать объем работы и не изобретать очередной велосипед.

Некоторые объекты и вовсе нельзя передать из одного процесса в другой, поэтому приходится придумывать обходные пути. Но об этом я напишу в другой раз.

Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
+6
Комментарии 2
Комментарии Комментарии 2

Публикации

Истории

Работа

Data Scientist
74 вакансии
Python разработчик
125 вакансий

Ближайшие события

Fin Academy Forum 2024
Дата 30 января
Время 10:00 – 18:30
Место
Москва Онлайн