Ik heb problemen met de multiprocessing-module. Ik gebruik een pool van werkers met zijn kaartmethode om gegevens uit veel bestanden te laden en voor elk van hen analyseer ik gegevens met een aangepaste functie. Elke keer dat een bestand is verwerkt, zou ik graag een teller willen laten bijwerken, zodat ik kan bijhouden hoeveel bestanden er nog moeten worden verwerkt.
Hier is een voorbeeldcode:
def analyze_data( args ):
# do something
counter += 1
print counter
if __name__ == '__main__':
list_of_files = os.listdir(some_directory)
global counter
counter = 0
p = Pool()
p.map(analyze_data, list_of_files)
Ik kan hier geen oplossing voor vinden.
Antwoord 1, autoriteit 100%
Het probleem is dat de variabele counter
niet wordt gedeeld tussen uw processen: elk afzonderlijk proces maakt zijn eigen lokale instantie en verhoogt die.
Zie dit gedeeltevan de documentatie voor sommige technieken die u kunt gebruiken om de status tussen uw processen te delen. In jouw geval wil je misschien een Value
instantie tussen uw werknemers
Hier is een werkende versie van uw voorbeeld (met enkele dummy-invoergegevens). Merk op dat het globale waarden gebruikt die ik in de praktijk echt zou proberen te vermijden:
from multiprocessing import Pool, Value
from time import sleep
counter = None
def init(args):
''' store the counter for later use '''
global counter
counter = args
def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10
if __name__ == '__main__':
#inputs = os.listdir(some_directory)
#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]
#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()
Antwoord 2, autoriteit 52%
Tegenklasse zonder de race-conditie-bug:
class Counter(object):
def __init__(self):
self.val = multiprocessing.Value('i', 0)
def increment(self, n=1):
with self.val.get_lock():
self.val.value += n
@property
def value(self):
return self.val.value
Antwoord 3, autoriteit 15%
Een extreem eenvoudig voorbeeld, gewijzigd ten opzichte van het antwoord van jkp:
from multiprocessing import Pool, Value
from time import sleep
counter = Value('i', 0)
def f(x):
global counter
with counter.get_lock():
counter.value += 1
print("counter.value:", counter.value)
sleep(1)
return x
with Pool(4) as p:
r = p.map(f, range(1000*1000))
Antwoord 4, autoriteit 7%
Sneller tellerklasse zonder tweemaal het ingebouwde waardeslot te gebruiken
class Counter(object):
def __init__(self, initval=0):
self.val = multiprocessing.RawValue('i', initval)
self.lock = multiprocessing.Lock()
def increment(self):
with self.lock:
self.val.value += 1
@property
def value(self):
return self.val.value
https://eli.thegreenplace.net/ 2012/01/04/shared-counter-with-pythons-multiprocessing
https://docs.python.org/2/Library/multiprocessing .html # multiprocessing.sharedctypes.value
https://docs.python.org/2/Library/Multiprocessing .html # multiprocessing.sharedctypes.rawvalue