Python-multiprocessing en een gedeelde teller

Ik heb problemen met de multiprocessing-module. Ik gebruik een pool van werkers met zijn kaartmethode om gegevens uit veel bestanden te laden en voor elk van hen analyseer ik gegevens met een aangepaste functie. Elke keer dat een bestand is verwerkt, zou ik graag een teller willen laten bijwerken, zodat ik kan bijhouden hoeveel bestanden er nog moeten worden verwerkt.
Hier is een voorbeeldcode:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter
if __name__ == '__main__':
    list_of_files = os.listdir(some_directory)
    global counter
    counter = 0
    p = Pool()
    p.map(analyze_data, list_of_files)

Ik kan hier geen oplossing voor vinden.


Antwoord 1, autoriteit 100%

Het probleem is dat de variabele counterniet wordt gedeeld tussen uw processen: elk afzonderlijk proces maakt zijn eigen lokale instantie en verhoogt die.

Zie dit gedeeltevan de documentatie voor sommige technieken die u kunt gebruiken om de status tussen uw processen te delen. In jouw geval wil je misschien een Valueinstantie tussen uw werknemers

Hier is een werkende versie van uw voorbeeld (met enkele dummy-invoergegevens). Merk op dat het globale waarden gebruikt die ik in de praktijk echt zou proberen te vermijden:

from multiprocessing import Pool, Value
from time import sleep
counter = None
def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args
def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10
if __name__ == '__main__':
    #inputs = os.listdir(some_directory)
    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]
    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()

Antwoord 2, autoriteit 52%

Tegenklasse zonder de race-conditie-bug:

class Counter(object):
    def __init__(self):
        self.val = multiprocessing.Value('i', 0)
    def increment(self, n=1):
        with self.val.get_lock():
            self.val.value += n
    @property
    def value(self):
        return self.val.value

Antwoord 3, autoriteit 15%

Een extreem eenvoudig voorbeeld, gewijzigd ten opzichte van het antwoord van jkp:

from multiprocessing import Pool, Value
from time import sleep
counter = Value('i', 0)
def f(x):
    global counter
    with counter.get_lock():
        counter.value += 1
    print("counter.value:", counter.value)
    sleep(1)
    return x
with Pool(4) as p:
    r = p.map(f, range(1000*1000))

Antwoord 4, autoriteit 7%

Sneller tellerklasse zonder tweemaal het ingebouwde waardeslot te gebruiken

class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.RawValue('i', initval)
        self.lock = multiprocessing.Lock()
    def increment(self):
        with self.lock:
            self.val.value += 1
    @property
    def value(self):
        return self.val.value

https://eli.thegreenplace.net/ 2012/01/04/shared-counter-with-pythons-multiprocessing
https://docs.python.org/2/Library/multiprocessing .html # multiprocessing.sharedctypes.value
https://docs.python.org/2/Library/Multiprocessing .html # multiprocessing.sharedctypes.rawvalue

Other episodes