Ik probeer een gedeeltelijke functie te gebruiken zodat pool.map() een functie kan targeten die meer dan één parameter heeft (in dit geval een Lock()-object).
Hier is een voorbeeldcode (uit een antwoord op een eerdere vraag van mij):
from functools import partial
def target(lock, iterable_item):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
Als ik deze code echter uitvoer, krijg ik de foutmelding:
Runtime Error: Lock objects should only be shared between processes through inheritance.
Wat mis ik hier? Hoe kan ik de vergrendeling delen tussen mijn subprocessen?
Antwoord 1, autoriteit 100%
Je kunt geen normale multiprocessing.Lock
-objecten doorgeven aan Pool
-methoden, omdat ze niet kunnen worden gepickt. Er zijn twee manieren om dit te omzeilen. Een daarvan is het maken van Manager()
en geef een Manager.Lock()
:
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
m = multiprocessing.Manager()
l = m.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
Dit is echter een beetje zwaargewicht; het gebruik van een Manager
vereist een ander proces om de Manager
-server te hosten. En alle oproepen om de vergrendeling acquire
/release
te sturen, moeten via IPC naar die server worden gestuurd.
De andere optie is om de normale multiprocessing.Lock()
door te geven bij het maken van de pool, met behulp van de initializer
kwarg. Dit maakt uw slotinstantie globaal in alle onderliggende werkers:
def target(iterable_item):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def init(l):
global lock
lock = l
def main():
iterable = [1, 2, 3, 4, 5]
l = multiprocessing.Lock()
pool = multiprocessing.Pool(initializer=init, initargs=(l,))
pool.map(target, iterable)
pool.close()
pool.join()
De tweede oplossing heeft als neveneffect dat partial
niet langer nodig is.
Antwoord 2
Hier is een versie (die Barrier
gebruikt in plaats van Lock
, maar je snapt het idee) die ook zou werken op Windows (waar de ontbrekende fork
veroorzaakt extra problemen):
import multiprocessing as mp
def procs(uid_barrier):
uid, barrier = uid_barrier
print(uid, 'waiting')
barrier.wait()
print(uid, 'past barrier')
def main():
N_PROCS = 10
with mp.Manager() as man:
barrier = man.Barrier(N_PROCS)
with mp.Pool(N_PROCS) as p:
p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))
if __name__ == '__main__':
mp.freeze_support()
main()