Python deelt een slot tussen processen

Ik probeer een gedeeltelijke functie te gebruiken zodat pool.map() een functie kan targeten die meer dan één parameter heeft (in dit geval een Lock()-object).

Hier is een voorbeeldcode (uit een antwoord op een eerdere vraag van mij):

from functools import partial
def target(lock, iterable_item):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()
def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    l = multiprocessing.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

Als ik deze code echter uitvoer, krijg ik de foutmelding:

Runtime Error: Lock objects should only be shared between processes through inheritance.

Wat mis ik hier? Hoe kan ik de vergrendeling delen tussen mijn subprocessen?


Antwoord 1, autoriteit 100%

Je kunt geen normale multiprocessing.Lock-objecten doorgeven aan Pool-methoden, omdat ze niet kunnen worden gepickt. Er zijn twee manieren om dit te omzeilen. Een daarvan is het maken van Manager()en geef een Manager.Lock():

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    m = multiprocessing.Manager()
    l = m.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

Dit is echter een beetje zwaargewicht; het gebruik van een Managervereist een ander proces om de Manager-server te hosten. En alle oproepen om de vergrendeling acquire/releasete sturen, moeten via IPC naar die server worden gestuurd.

De andere optie is om de normale multiprocessing.Lock()door te geven bij het maken van de pool, met behulp van de initializerkwarg. Dit maakt uw slotinstantie globaal in alle onderliggende werkers:

def target(iterable_item):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()
def init(l):
    global lock
    lock = l
def main():
    iterable = [1, 2, 3, 4, 5]
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(initializer=init, initargs=(l,))
    pool.map(target, iterable)
    pool.close()
    pool.join()

De tweede oplossing heeft als neveneffect dat partialniet langer nodig is.


Antwoord 2

Hier is een versie (die Barriergebruikt in plaats van Lock, maar je snapt het idee) die ook zou werken op Windows (waar de ontbrekende forkveroorzaakt extra problemen):

import multiprocessing as mp
def procs(uid_barrier):
    uid, barrier = uid_barrier
    print(uid, 'waiting')
    barrier.wait()
    print(uid, 'past barrier')    
def main():
    N_PROCS = 10
    with mp.Manager() as man:
        barrier = man.Barrier(N_PROCS)
        with mp.Pool(N_PROCS) as p:
            p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))
if __name__ == '__main__':
    mp.freeze_support()
    main()

Other episodes