In de Python multiprocessing
-bibliotheek, is er een variant van pool.map
die meerdere argumenten ondersteunt?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
Antwoord 1, autoriteit 100%
Het antwoord hierop is versie- en situatieafhankelijk. Het meest algemene antwoord voor recente versies van Python (sinds 3.3) werd hieronder voor het eerst beschreven door J.F. Sebastian.1Het gebruikt de Pool.starmap
methode, die een reeks argument-tupels accepteert. Het pakt dan automatisch de argumenten van elke tuple uit en geeft ze door aan de gegeven functie:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Voor eerdere versies van Python moet je een helperfunctie schrijven om de argumenten expliciet uit te pakken. Als je with
wilt gebruiken, moet je ook een wrapper schrijven om van Pool
een contextmanager te maken. (Met dank aan muonvoor het wijzen dit uit.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
In eenvoudigere gevallen, met een vast tweede argument, kun je ook partial
gebruiken, maar alleen in Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Veel hiervan was geïnspireerd door zijn antwoord, dat in plaats daarvan waarschijnlijk had moeten worden geaccepteerd. Maar aangezien deze bovenaan blijft hangen, leek het me het beste om hem te verbeteren voor toekomstige lezers.
Antwoord 2, autoriteit 94%
is er een variant van pool.map die meerdere argumenten ondersteunt?
Python 3.3 bevat pool.starmap()
methode:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
Voor oudere versies:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
Uitvoer
1 1
2 1
3 1
Zie hoe itertools.izip()
en itertools.repeat()
worden hier gebruikt.
Vanwege de bug genoemd door @unutbukun je functools.partial()
of vergelijkbare mogelijkheden op Python 2.6, dus de eenvoudige wrapper-functie func_star()
moet expliciet worden gedefinieerd. Zie ook de oplossingvoorgesteld door uptimebox
.
Antwoord 3, autoriteit 34%
Ik denk dat het onderstaande beter zal zijn
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
uitvoer
[3, 5, 7]
Antwoord 4, autoriteit 17%
Python 3.3+gebruiken met pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
Resultaat:
1 --- 4
2 --- 5
3 --- 6
Je kunt ook meer argumenten zip() als je wilt: zip(a,b,c,d,e)
Als u een constante waardeals argument wilt doorgeven:
import itertools
zip(itertools.repeat(constant), a)
In het geval dat uw functie iets retourneert:
results = pool.starmap(write, zip(a,b))
Dit geeft een lijst met de geretourneerde waarden.
Antwoord 5, autoriteit 14%
Hoe meerdere argumenten te nemen:
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Antwoord 6, autoriteit 6%
Nadat ik heb geleerd over itertools in J.F. Sebastianantwoord Ik besloot een stap verder te gaan en een parmap
-pakket te schrijven dat zorgt voor parallellisatie, met map
en starmap
functies op python-2.7 en python-3.2 (en later ook) die elk aantalpositionele argumenten kunnen aannemen.
Installatie
pip install parmap
Hoe te parallelliseren:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
Ik heb parmap geüpload naar PyPI en naar een github-repository.
Als voorbeeld kan de vraag als volgt worden beantwoord:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
Antwoord 7, autoriteit 3%
Er is een splitsing van multiprocessing
genaamd pathos(opmerking: gebruik de versie op github) die geen starmap
nodig heeft — de kaartfuncties weerspiegelen de API voor de python-kaart, dus de kaart kan meerdere argumenten bevatten. Met pathos
kun je over het algemeen ook multiprocessing doen in de interpreter, in plaats van vast te zitten in het __main__
-blok. Pathos moet worden uitgebracht, na wat milde updates — meestal conversie naar python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
pathos
heeft verschillende manieren waarop je het exacte gedrag van starmap
kunt krijgen.
>>> def add(*x):
... return sum(x)
...
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>>
Antwoord 8, autoriteit 2%
Een betere oplossing voor python2:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
OUT []:
[3, 5, 7]
Antwoord 9, Autoriteit 2%
U kunt de volgende twee functies gebruiken om te voorkomen dat u een wikkel wilt schrijven voor elke nieuwe functie:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
Gebruik de functie function
met de lijsten van argumenten arg_0
, arg_1
EN arg_2
ALS VOLGERS:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Antwoord 10, Autoriteit 2%
Nog een eenvoudig alternatief is om uw functieparameters in een tuple in te wikkelen en vervolgens de parameters in te wikkelen die ook in tuples moeten worden gepasseerd. Dit is misschien niet ideaal bij het omgaan met grote stukjes gegevens. Ik geloof dat het voor elke tuple kopieën zou maken.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
geeft de uitvoer in een willekeurige volgorde:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Antwoord 11, Autoriteit 2%
Een betere manier is het gebruik van decorateur in plaats van het schrijven van wrapper functie met de hand. Zeker als je een heleboel functies in kaart brengen, zal decorateur uw tijd te besparen door het vermijden van het schrijven wrapper voor elke functie. Meestal een versierde functie is niet picklable, maar we functools
kan gebruiken om rond te krijgen. Meer disscusions kan worden gevonden hier .
Hier het voorbeeld
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Dan kunt u deze kaart te brengen met rits argumenten
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Natuurlijk, kunt u altijd gebruik maken van Pool.starmap
Python 3 (& gt; = 3,3). zoals vermeld in andere antwoorden
Antwoord 12
Hier is een andere manier om het te doen dat IMHO is eenvoudiger en eleganter dan een van de andere antwoorden.
Dit programma heeft een functie die twee parameters neemt, print ze uit en drukt ook de som:
import multiprocessing
def main():
with multiprocessing.Pool(10) as pool:
params = [ (2, 2), (3, 3), (4, 4) ]
pool.starmap(printSum, params)
# end with
# end function
def printSum(num1, num2):
mySum = num1 + num2
print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function
if __name__ == '__main__':
main()
uitvoer is:
num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8
Bekijk de Python-documenten voor meer informatie:
https://docs.python.org/3/ library/multiprocessing.html#module-multiprocessing.pool
Bekijk vooral de functie starmap
.
Ik gebruik Python 3.6, ik weet niet zeker of dit zal werken met oudere Python-versies
Waarom er niet zo’n duidelijk voorbeeld als dit in de documenten staat, weet ik niet zeker.
Antwoord 13
Een andere manier is om een lijst met lijsten door te geven aan een routine met één argument:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
Men kan dan een lijst met argumenten maken met de eigen favoriete methode.
Antwoord 14
Vanaf python 3.4.4 kunt u multiprocessing.get_context() gebruiken om een contextobject te verkrijgen om meerdere startmethoden te gebruiken:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
Of je vervangt gewoon
pool.map(harvester(text,case),case, 1)
door:
pool.apply_async(harvester(text,case),case, 1)
Antwoord 15
In de officiële documentatie staat dat het slechts één itereerbaar argument ondersteunt. In dergelijke gevallen gebruik ik graag apply_async. In jouw geval zou ik het volgende doen:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
Antwoord 16
Er zijn hier veel antwoorden, maar geen enkele lijkt Python 2/3-compatibele code te bieden die op elke versie werkt. Als u wilt dat uw code gewoon werkt, werkt dit voor beide Python-versies:
# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
from contextlib import contextmanager
@contextmanager
def multiprocessing_context(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
else:
multiprocessing_context = multiprocessing.Pool
Daarna kun je multiprocessing gebruiken op de reguliere Python 3-manier, zoals je wilt. Bijvoorbeeld:
def _function_to_run_for_each(x):
return x.lower()
with multiprocessing_context(processes=3) as pool:
results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results)
werkt in Python 2 of Python 3.
Antwoord 17
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don't need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()
Antwoord 18
Dit is een voorbeeld van de routine die ik gebruik om meerdere argumenten door te geven aan een functie met één argument die wordt gebruikt in een pool.imapvork:
from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
def __init__(self, var2):
self.var2 = var2
def fun(self, i):
var2 = self.var2
return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
# Obtain the function to map
pool_fun = makefun(var2[j]).fun
# Fork loop
for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()
Antwoord 19
Dit is misschien een andere optie. De truc zit in de functie wrapper
die een andere functie retourneert die wordt doorgegeven aan pool.map
. De onderstaande code leest een invoerarray en retourneert voor elk (uniek) element daarin hoe vaak (dwz telt) dat element in de array voorkomt, bijvoorbeeld als de invoer is
np.eye(3) = [ [1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
dan verschijnt nul 6 keer en één 3 keer
import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count
def extract_counts(label_array):
labels = np.unique(label_array)
out = extract_counts_helper([label_array], labels)
return out
def extract_counts_helper(args, labels):
n = max(1, cpu_count() - 1)
pool = ThreadPool(n)
results = {}
pool.map(wrapper(args, results), labels)
pool.close()
pool.join()
return results
def wrapper(argsin, results):
def inner_fun(label):
label_array = argsin[0]
counts = get_label_counts(label_array, label)
results[label] = counts
return inner_fun
def get_label_counts(label_array, label):
return sum(label_array.flatten() == label)
if __name__ == "__main__":
img = np.ones([2,2])
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
img = np.eye(3)
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
img = np.random.randint(5, size=(3, 3))
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
U moet:
input array:
[[1. 1.]
[1. 1.]]
label counts: {1.0: 4}
========
input array:
[[1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
label counts: {0.0: 6, 1.0: 3}
========
input array:
[[4 4 0]
[2 4 3]
[2 3 1]]
label counts: {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========
Antwoord 20
Bewaar al uw argumenten als een reeks van tuples .
Voorbeeld Zeg normaal gesproken belt u uw functie als
def mainImage(fragCoord : vec2, iResolution : vec3, iTime : float) -> vec3:
Pas in plaats daarvan één tuple en pak de argumenten uit
def mainImage(package_iter) -> vec3:
fragCoord=package_iter[0]
iResolution=package_iter[1]
iTime=package_iter[2]
Bouw de tuple op door een lus te gebruiken voor de hand
package_iter = []
iResolution = vec3(nx,ny,1)
for j in range( (ny-1), -1, -1):
for i in range( 0, nx, 1):
fragCoord : vec2 = vec2(i,j)
time_elapsed_seconds = 10
package_iter.append( (fragCoord, iResolution, time_elapsed_seconds) )
voer vervolgens alles uit met behulp van de kaart door de ARRAY van TUPLES door te geven
array_rgb_values = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for val in executor.map(mainImage, package_iter):
fragColor=val
ir = clip( int(255* fragColor.r), 0, 255)
ig = clip(int(255* fragColor.g), 0, 255)
ib= clip(int(255* fragColor.b), 0, 255)
array_rgb_values.append( (ir,ig,ib) )
Ik weet dat Python * en ** heeft voor het uitpakken , maar ik heb die nog niet geprobeerd.
Het is ook beter om gelijktijdige futures van een bibliotheek op een hoger niveau te gebruiken dan de multiprocessing-bibliotheek op een laag niveau
Antwoord 21
import time
from multiprocessing import Pool
def f1(args):
vfirst, vsecond, vthird = args[0] , args[1] , args[2]
print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
pass
if __name__ == '__main__':
p = Pool()
result = p.map(f1, [['Dog','Cat','Mouse']])
p.close()
p.join()
print(result)
Antwoord 22
voor python2 kun je deze truc gebruiken
def fun(a,b):
return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))