Параллельное программирование — сложный, но очень полезный навык для программиста. Оно позволяет эффективно использовать мощности современных компьютеров с несколькими ядрами и процессорами. Это особенно важно при решении сложных задач, например, в инженерных расчетах, обработке мультимедийных данных, обучении нейросетей и многом другом.
Чтобы не перегружать статью, я не буду подробно писать про параллелизм на Python, а рассмотрю только узкую тему обмена данными в мультипроцессных программах. Для того, чтобы разобраться в этом, вам понадобятся базовые навыки работы с модулем Multiprocessing.
Если же вы еще не погружались в тему параллельного программирования и программирования на Python в общем, то вот несколько книг, которые вам в этом помогут:
Модуль Multiprocessing
позволяет использовать так называемый истинный параллелизм, то есть создавать процессы, которые выполняются полностью независимо друг от друга. Про отличие от мнимого параллелизма и особенности модуля Threading можно почитать, например, по этой ссылке.
В этом случае процессы не имеют общей памяти и не могут просто так читать и изменять одни и те же переменные. По возможности рекомендуется и вовсе избегать межпроцессного обмена данными, но большинство сложных задач без этого не решаются, или решение слишком сложное.
Конечно же, в модуле multiprocessing
реализован нативный способ передавать данные между процессами, и даже не один. Однако как только мы отходим от встроенных типов данных, то готовые решения уже не работают. О том, как с этим обходиться, я и расскажу в этой статье. Будем двигаться от простых примеров к сложным. Все примеры кода из этой статьи можно найти в этом проекте google collab.
Передача данных при создании процесса
При создании процесса, выполняющего функцию (target), можно передать параметры этой функции. Все объекты передаются в процесс «по значению», то есть для процесса создается копия переданного объекта. Изменения копии объекта внутри процесса не влияют на исходный объект. Такой подход можно использовать, если вы планируете только один раз скопировать объекты в дочерний процесс и дальше не следить за их изменениями. Однако следует быть очень внимательными, так как передача объектов по значению не согласуется с привычным поведением переменных изменяемых типов данных в Python (списки, словари, множества). Продемонстрируем разницу на следующем примере:
import multiprocessing as mp
def demoFunction(targetList,newElem,comment):
targetList.append(newElem)
print(comment, 'Inside demoFunction: ', targetList)
if __name__ == '__main__':
myList = [1,2,3]
demoFunction(myList, 4, 'Plain.')# вызовем функцию в основном процессе
print('After "demoFunction" without multiprocessing', myList)
# создадим процесс, который ответвляется от основного и выполнит функцию demoFunction
p = mp.Process(target=demoFunction, args=[myList,5,'Multiprocessing.'])
p.start()# запускаем процесс
p.join() # дожидаемся завершения процесса
print('After "demoFunction" with multiprocessing', myList)
Результат:
Plain. Inside demoFunction: [1, 2, 3, 4]
After "demoFunction" without multiprocessing [1, 2, 3, 4]
Multiprocessing. Inside demoFunction: [1, 2, 3, 4, 5]
After "demoFunction" with multiprocessing [1, 2, 3, 4]
Сначала функция demoFunction
выполнилась в основном процессе и никакой межрпроцессной передачи данных не происходило. Список, переданный в функцию, был передан по ссылке, так как список относится к изменяемым типам данных. Поэтому после выполнения функции изменения произошли и со списком myList
.
Затем та же функция выполнилась в дочернем процессе p, в котором была создана копия списка myList
. Копия списка myList
изменилась внутри дочернего процесса, однако на список myList
в основном процессе это не повлияло.
Односторонняя передача данных в процесс при его создании работает просто и понятно, но не годится для случаев, когда несколько процессов должны изменять и читать общие данные в течение своей работы. Для этого используются другие инструменты.
Shared Memory
Этот способ я привожу только для полноты картины, далее в статье мы использовать его не будем, поэтому я позволю себе просто привести перевод документации.
Данные можно хранить в разделяемой памяти, используя объекты Value или Array.
import multiprocessing as mp
def sharedMemDemo(targetNum, targetArr):
targetNum.value = 3.1415927
for i in range(len(targetArr)):
targetArr[i] = -targetArr[i]
if __name__ == '__main__':
myNum = mp.Value('d', 0.0)
myArr = mp.Array('i', range(10))
p = mp.Process(target=sharedMemDemo, args=(myNum, myArr))
p.start()# запускаем процесс
p.join() # дожидаемся завершения процесса
print(myNum.value)
print(myArr[:]) # срез от mp.Array дает тип list, который удобно печатается
Результат:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Аргументы 'd' и 'i', используемые при создании myNum
и myArr
, являются кодами типов. Эти объекты потоко‑ и процессо‑безопасны.
Для более гибкой работы с общей памятью можно использовать модуль multiprocessing.sharedctypes
, который поддерживает создание произвольных объектов ctypes
, выделенных из общей памяти.
Server process. SyncManager
SyncManager позволяет синхронизировать между процессами более сложные структуры данных, но этих структур не очень много. Их можно разделить на средства синхронизации и непосредственно структуры данных:
Средства синхронизации: | Структуры данных: |
- Semaphore | - Namespace |
При передаче данных между процессами данные подвергаются сначала сериализации, а потом десериализации с помощью pickle
, поэтому содержимым списка, очереди, массива и словаря могут быть только структуры данных, которые pickable.
Пример использования:
import multiprocessing as mp
def demoFunc(targetDict, targetList):
targetDict[1] = '1'
targetDict['2'] = 2
targetDict[0.25] = None
targetList.reverse()
if __name__ == '__main__':
with mp.Manager() as manager:
myDict = manager.dict()
myList = manager.list(range(10))
p = mp.Process(target=demoFunc, args=(myDict, myList))
p.start()
p.join()
print(myDict)
print(myList)
Результат:
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Этот вариант синхронизации данных более функционален: менеджер может использоваться даже процессами на разных компьютерах, — но в простых случаях лучше использовать shared memory,
так как она работает быстрее.
Если в программе используются только встроенные типы данных, то функциональности multiprocessing
хватит с головой. К тому же передаваемые через средства multiprocessing объекты защищены от конфликтов совместного доступа к разделяемой памяти. Стоит заметить, что это верно только для атомарных стандартных операций. Если вам важно, чтобы процесс имел эксклюзивный доступ к объекту на время выполнения составных операций, то соответствующую область кода необходимо защищать самостоятельно. Здесь на помощь придут Lock, Event
и т. д.
Но что делать, если в программе используются пользовательские классы или объекты из других модулей? Решение для таких случаев можно построить на базе SyncManager. Как именно, я расскажу далее, но для этого надо сначала углубиться в механизм его работы.
Proxy-объекты
Менеджер создает служебный процесс (так называемый серверный процесс), в котором размещается централизованная версия разделяемого объекта. Процессы не имеют прямого доступа к этому объекту. Менеджер создает так называемый прокси‑объект для каждого процесса, и процессы обращаются именно к прокси‑объекту.
Прокси‑объекты используются в коде программы точно так же, как исходные объекты, и логика их работы скрыта от программиста. Они сериализуют данные, синхронизируют и координируют изменения, вносимые процессами, с централизованной версией объекта, размещенной в менеджере.
Замечание:
Важно помнить, что для прокси‑объектов не поддерживается сравнение по значению. Например,
>>> manager.list([1,2,3]) == [1,2,3]
False
Для сравнения необходимо использовать копии прокси‑объекта.
>>> manager.list([1,2,3])[:] == [1,2,3]
True
При сборке мусора прокси‑объект «отменяет свою регистрацию» в менеджере, который владеет разделяемым объектом. Разделяемый объект удаляется из процесса менеджера, когда больше нет прокси‑объектов, ссылающихся на него.
Пользовательские классы
Наконец перейдем к основной теме статьи — межпроцессному обмену пользовательскими классами. Сначала рассмотрим пример класса, который содержит только поля встроенных типов данных и функции.
class MyAwesomeClass:
def __init__(self):
self.x = 100
def __str__(self):
return str(self.x)
def change(self):
self.x = 200
Передадим этот объект в дочерний процесс как аргумент функции. За счет того, что все поля и функции в этом классе pickable, мы можем сделать это без каких‑либо дополнительных действий. В функции будем изменять поле класса тремя способами: прибавлять число, присваивать новое значение и присваивать новое значение с помощью функции класса.
import multiprocessing as mp
def changer(obj):
obj.x = 500
print('Hello! ',obj)
obj.change()
print('Hello! ',obj)
obj.x += 1
print('Hello! ',obj)
if __name__ == '__main__':
MAC = MyAwesomeClass()
p = mp.Process(target = changer, args=(MAC,))
p.start()
p.join()
print('Here I am: ', MAC)
Результат:
Hello! 500
Hello! 200
Hello! 201
Here I am: 100
Как мы видим, все три способа изменения сработали, однако повлияли только на копию объекта в дочернем процессе, но не на исходный объект. В этом случае механизм такой же, как и рассмотренный нами выше для встроенных типов данных.
Передадим этот объект в дочерний процесс с помощью списка SyncManager
. Перепишем функцию foo
, чтобы она работала с первым элементом переданного списка. Снова будет изменять значение поля x
тремя способами.
def changer(obj):
obj[0].x = 500
print('Hello! ',obj[0])
obj[0].change()
print('Hello! ',obj[0])
obj[0].x += 1
print('Hello! ',obj[0])
if __name__ == '__main__':
manager = mp.Manager()
sharedList = manager.list()
MAC = MyAwesomeClass()
sharedList.append(MAC)
p = mp.Process(target = changer, args=(sharedList,))
p.start()
p.join()
print('Here I am: ', sharedList[0])
Результат:
Hello! 100
Hello! 100
Hello! 100
Here I am: 100
В данном случае изменения не только не затронули исходный объект, но и не отразились даже в переданном списке. Это произошло потому, что у прокси-объектов по умолчанию отсутствует механизм оповещения об изменениях во вложенных структурах. Это можно обойти, изменив сам список, а не его содержимое, например, таким образом
def changer(obj):
tmp = obj[0]# скопируем объект во временную переменную
tmp.x = 500 # изменим временную переменную
obj[0] = tmp # присвоим элементу списка значение временной переменной, это вызовет оповещение списка об изменениях
print('Hello! ',obj[0])
Результат:
Hello! 500
Here I am: 500
Такой способ вполне можно использовать, однако не стоит забывать про особенности копирования объектов пользовательских классов.
Класс с не pickable полями
Дополним рассмотренный ранее класс функцией‑генератором.
def generatorFunc():
for i in range(10):
yield i*i
class MyAwesomeClass:
def __init__(self):
self.x = 100
self.gen = generatorFunc()
def __str__(self):
return str(self.x)
def change(self):
self.x = 200
Теперь, если мы попробуем передать объект этого класса в дочерний процесс любым из описанных выше способов, мы получим ошибкуTypeError: cannot pickle 'generator' object
В некоторых случаях наиболее простым вариантом будет реализовать pickle
сериализацию для пользовательского класса, но чаще всего это слишком трудозатратно. К тому же pickle сериализация небезопасна.
К счастью, есть другой способ.
Имплементация пользовательского прокси и менеджера
Чтобы мы могли работать с пользовательским классом по тому же принципу, что и со встроенными типами, нам необходимо создать соответствующие прокси‑классы и свой собственный менеджер.
От менеджера нам не надо ничего особенного, достаточно наследовать его от базового менеджера. Можно конечно использовать и сам базовый менеджер, но правила хорошего тона предписывают нам не влезать в стандартные классы чужих модулей. Поэтому мы создадим свой собственный класс:
from multiprocessing.managers import BaseManager
class MyManager(BaseManager):
pass # да, и это все
Далее возьмем определение MyAwesomeClass
(пока без генератора) и посмотрим подробнее, что нам нужно
class MyAwesomeClass:
def __init__(self):
self.x = 100
def __str__(self):
return str(self.x)
def change(self):
self.x = 200
x — публичный атрибут класса. Мы хотим, чтобы можно было его читать и изменять. Этот механизм уже реализован в классе multiprocessing.managers.NamespaceProxy
, поэтому возьмем его в качестве родительского класса.
from multiprocessing.managers import NamespaceProxy
class MyAwesomeClassProxy(NamespaceProxy):
pass # да, и тут тоже больше ничего не надо
По умолчанию NamespaceProxy
позволяет нам обращаться ко всем публичным функциям и читать и изменять все публичные поля класса. Найти внятного описания в документации у меня не вышло, поэтому пришлось залезть в код и посмотреть, как это работает. Далее в статье я приведу кусок этого кода, но пока просто примите это на веру.
Теперь мы можем зарегистрировать класс в менеджере и использовать зарегистрированный тип для создания объектов
MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)
M = MyManager()
M.start()
MAC = M.MyAwesomeClassRegistred()
Подробнее о методе register можно прочитать в документации. Нам будут важны первые четыре параметра.
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
typeid - это "идентификатор типа", который будет использован в менеджере
callable - это вызываемый объект, используемый для создания объектов для этого идентификатора типа. В данном конкретном случае – имя класса.
proxytype - это подкласс BaseProxy, который используется для создания прокси-объектов для общих объектов с этим идентификатором типа. Если None, то класс прокси создается автоматически (AutoProxy).
exposed используется для указания последовательности имен методов исходного класса, к которым прокси-объект должны иметь доступ с использованием BaseProxy.callmethod(). (Если exposed равен None, то вместо него используется proxytype.exposed, если он существует.) В случае, если список exposed не указан, для прокси-объекта будут доступны все публичные методы исходного класса.
Вот как выглядит решение с прокси-классом целиком
import multiprocessing as mp
from multiprocessing.managers import NamespaceProxy, BaseManager
class MyAwesomeClass:
def __init__(self):
self.x = 100
def __str__(self):
return str(self.x)
def changer(obj):
obj.x = 200
print('Hello! ',obj.x)
class MyManager(BaseManager):
pass
class MyAwesomeClassProxy(NamespaceProxy):
pass
MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)
if __name__ == '__main__':
M = MyManager()
M.start()
MAC = M.MyAwesomeClassRegistred()
p = mp.Process(target = changer, args=(MAC,))
p.start()
p.join()
print('Here I am: ', MAC.x)
Результат:
Hello! 200
Here I am: 200
Создание собственного прокси‑класса сделало код более читаемым. Теперь объект передается и меняется в явном виде без дополнительных оберток и переприсваиваний.
Давайте подробнее изучим, как устроен прокси‑класс. Реализация NamespaceProxy
уже достаточно сложная, так что чтобы разобраться, как работает вызов функций и обращение к полям класса, давайте сначала поработаем с BaseProxy
.
Разберем несколько примеров с exposed
и различными способами вызова функций.
import multiprocessing as mp
from multiprocessing.managers import BaseProxy, BaseManager
class MyManager(BaseManager):
pass # да, и это все
class MyAwesomeClass:
def __init__(self):
self.x = 100
def awesome_func(self):
print("Hello! I'm MyAwesomeClass. It's awesome_func")
def idle_func(self):
print("Hello! I'm MyAwesomeClass. It's idle_func")
def exposed_func(self):
print("Hello! I'm MyAwesomeClass. It's exposed_func")
def hidden_func(self):
print("Hello! I'm MyAwesomeClass. It's hidden_func")
class MyAwesomeClassProxy(BaseProxy):
_exposed_ = ('exposed_func', )
def exposed_func(self):
print('First we hit MyAwesomeClassProxy ')
self._callmethod('exposed_func')
def idle_func(self):
print('We are in MyAwesomeClassProxy, idle_func')
def hidden_func_proxy(self):
self._callmethod('hidden_func')
def hidden_func_proxy2(self):
self.hidden_func()
def allCaller(obj):
#obj.awesome_func()
#obj.idle_func()
obj.exposed_func()
#obj.hidden_func_proxy()
#obj.hidden_func_proxy2()
MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)
if __name__ == '__main__':
M = MyManager()
M.start()
MAC = M.MyAwesomeClassRegistred()
allCaller(MAC)
Теперь, если мы передадим объект obj класса MyAwesomeClass
в функцию allCaller
и будем по очереди раскомментировать строки и запускать интерпретатор, то получим следующий результат.
>>obj.awesome_func()
AttributeError: 'MyAwesomeClassProxy' object has no attribute 'awesome_func'
Функция awesome_func
была объявлена только в исходном классе, никакого упоминания о ней нет в прокси-классе. Когда мы обращаемся в переданному в процесс объекту, мы на самом деле обращаемся к его прокси‑объекту, который имеет уже другой тип — не MyAwesomeClass
, а MyAwesomeClassProxy
. Соответственно, нам доступны только поля и функции прокси‑класса. Именно поэтому мы получаем AttributeError.
>>obj.idle_func()
We are in MyAwesomeClassProxy, idle_func
Функция idle_func
объявлена в обоих классах. Но так как мы обращаемся к прокси-объекту, то активируется idle_func
прокси-класса. До idle_func
в MyAwesomeClass
мы таким образом добраться не можем.
>>obj.exposed_func()
First we hitMyAwesomeClassProxy
Hello! I'm MyAwesomeClass. It's exposed_fun
Функция exposed_func
добавлена в кортеж _exposed_
прокси-класса, поэтому из прокси-класса мы можем обратиться к exposed_func
изначального класса с помощью метода _callmethod
. Так как это приватный метод прокси-класса, нам нужно сделать функцию-обертку, которую мы и будем использовать в основном коде.
>>obj.hidden_func_proxy()
AttributeError: method 'hidden_func' of <class 'mp_main.MyAwesomeClass'> object is not in exposed={'exposed_func'}
Если метод не добавлен в кортеж exposed,
то _callmethod
вызовет исключение, как в случае с hidden_func_proxy.
>>obj.hidden_func_proxy2()
AttributeError: 'MyAwesomeClassProxy' object has no attribute 'hidden_func'
И наконец в последнем примере происходит такая же ошибка, как в первом. Self
указывает на объект прокси-класса, у которого не определена функция hidden_func
.
Такой же принцип действует и для полей класса, ведь на самом деле, встречая код obj.x
, интерпретатор вызывает встроенную функцию классаgetattribute(self, «x»)
. Соответственно, необходимо написать соответствующую функцию для прокси‑класса, что и реализовано в NamespaceProxy
:
_exposed_ = (‘__getattribute__’, )
# здесь идет работа и с методами, и с полями класса
def __getattr__(self, key):
if key[0] == '_':
return object.__getattribute__(self, key)
callmethod = object.__getattribute__(self, '_callmethod')
return callmethod('__getattribute__', (key,))
Класс со вложенным генератором
Вернемся к примеру со вложенным генератором. К сожалению, генератор не является pickable
объектом, поэтому мы не сможем его передать через Manager. При этом нам важно, чтобы состояние генератора было согласовано между процессами, чтобы получать корректную последовательность значений, значит, вариант с копией нам тоже не подходит. Один из вариантов решения этой проблемы – обернуть вызов генератора в функцию класса.
from multiprocessing import Manager, Process, Lock
from multiprocessing.managers import BaseManager, NamespaceProxy, BaseProxy, MakeProxyType
import time
def baz():
for i in range(10):
yield i*i
class MyAwesomeClass:
def __init__(self):
self.x = 100
self.gen = baz()
def __str__(self):
return str(self.x)
def next(self):
return self.gen.__next__()
def demoFunction(obj, num, ConsoleLock):
for i in range(4):
ConsoleLock.acquire() # захватываем замок, чтобы второй процесс не вмешался в вывод текста в консоль
print(num, 'Gen: ', obj.next())
print(num, 'Proxy ownVar = ', obj.ownVar)
ConsoleLock.release() # освободили консоль – освободили замок
time.sleep(1)
class MyManager(BaseManager):
pass
class MyAwesomeClassProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'next')
def __init__(self, *args, **kwargs):
super().__init__(*args,**kwargs)
self.ownVar = 10
def next(self):
self.ownVar += 1
return self._callmethod('next')
MyManager.register('MyAwesomeClassRegistred', MyAwesomeClass, MyAwesomeClassProxy)
if __name__ == '__main__':
M = MyManager()
M.start()
MAC = M.MyAwesomeClassRegistred()
# замок нам понадобится, чтобы процессы не конфликтовали
# за консоль при печати,
# а не то вывод перемешается и будет ничего непонятно
ConsoleLock = Lock()
p = Process(target = demoFunction, args=(MAC,1, ConsoleLock))
p2 = Process(target = demoFunction, args=(MAC,2,ConsoleLock))
p.start()
p2.start()
p.join()
p2.join()
Результат:
1 Gen: 0
1 Proxy ownVar = 11
2 Gen: 1
2 Proxy ownVar = 12
1 Gen: 4
1 Proxy ownVar = 13
2 Gen: 9
2 Proxy ownVar = 14
1 Gen: 16
1 Proxy ownVar = 15
2 Gen: 25
2 Proxy ownVar = 16
1 Gen: 36
1 Proxy ownVar = 17
2 Gen: 49
2 Proxy ownVar = 18
Здесь важно обратить внимание на работу с генератором. Обращение к генератору и методу next()
происходит только внутри исходного класса. Прокси‑класс вызывает метод next()
исходного класса (поэтому название этого метода добавлено в exposed
). То есть объект‑генератор не передается и не копируется в прокси‑объект. Мы видим, что его состояние меняется обоими процессами, т. е. оба процесса используют один и тот же генератор.
Однако у каждого процесса есть свой собственный прокси‑объект, к которому они и обращаются. Это видно на примере поля ownVar
— собственного поля прокси‑класса, которое отсутствует у исходного класса. Поле ownVar
меняется у прокси‑объектов независимо друг от друга.
Подводим итоги
Передача данных между процессами замедляет работу программы в целом и кроме этого усложняет логику. В ситуациях, когда невозможно полностью избежать передачи данных, лучше ограничиться встроенными типами данных и механизмом Shared Memory
. Применение собственных Proxy
и Manager
следует рассматривать только в случаях, когда других вариантов не осталось.
Написание и отладка собственного прокси‑класса с более или менее сложной структурой являются довольно трудоемкой задачей. Вложенные структуры данных могут вызывать проблемы, и нужно очень внимательно следить за тем, какие объекты изменяются в процессах, чтобы прописать соответствующую логику в прокси‑классе. Будьте готовы к тому, что простого решения не получится. Бояться, разумеется, этого тоже не стоит, просто надо трезво оценивать объем работы и не изобретать очередной велосипед.
Некоторые объекты и вовсе нельзя передать из одного процесса в другой, поэтому приходится придумывать обходные пути. Но об этом я напишу в другой раз.