Hoe multiprocessing pool.map gebruiken met meerdere argumenten?

In de Python multiprocessing-bibliotheek, is er een variant van pool.mapdie meerdere argumenten ondersteunt?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()

Antwoord 1, autoriteit 100%

Het antwoord hierop is versie- en situatieafhankelijk. Het meest algemene antwoord voor recente versies van Python (sinds 3.3) werd hieronder voor het eerst beschreven door J.F. Sebastian.1Het gebruikt de Pool.starmapmethode, die een reeks argument-tupels accepteert. Het pakt dan automatisch de argumenten van elke tuple uit en geeft ze door aan de gegeven functie:

import multiprocessing
from itertools import product
def merge_names(a, b):
    return '{} & {}'.format(a, b)
if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Voor eerdere versies van Python moet je een helperfunctie schrijven om de argumenten expliciet uit te pakken. Als je withwilt gebruiken, moet je ook een wrapper schrijven om van Pooleen contextmanager te maken. (Met dank aan muonvoor het wijzen dit uit.)

import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
    return '{} & {}'.format(a, b)
def merge_names_unpack(args):
    return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()
if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

In eenvoudigere gevallen, met een vast tweede argument, kun je ook partialgebruiken, maar alleen in Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()
def merge_names(a, b):
    return '{} & {}'.format(a, b)
if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Veel hiervan was geïnspireerd door zijn antwoord, dat in plaats daarvan waarschijnlijk had moeten worden geaccepteerd. Maar aangezien deze bovenaan blijft hangen, leek het me het beste om hem te verbeteren voor toekomstige lezers.


Antwoord 2, autoriteit 94%

is er een variant van pool.map die meerdere argumenten ondersteunt?

Python 3.3 bevat pool.starmap()methode:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
    return a + b
def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N
if __name__=="__main__":
    freeze_support()
    main()

Voor oudere versies:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
    print a, b
def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)
def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
    freeze_support()
    main()

Uitvoer

1 1
2 1
3 1

Zie hoe itertools.izip()en itertools.repeat()worden hier gebruikt.

Vanwege de bug genoemd door @unutbukun je functools.partial()of vergelijkbare mogelijkheden op Python 2.6, dus de eenvoudige wrapper-functie func_star()moet expliciet worden gedefinieerd. Zie ook de oplossingvoorgesteld door uptimebox.


Antwoord 3, autoriteit 34%

Ik denk dat het onderstaande beter zal zijn

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

uitvoer

[3, 5, 7]

Antwoord 4, autoriteit 17%

Python 3.3+gebruiken met pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 
def write(i, x):
    print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"] 
pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Resultaat:

1 --- 4
2 --- 5
3 --- 6

Je kunt ook meer argumenten zip() als je wilt: zip(a,b,c,d,e)

Als u een constante waardeals argument wilt doorgeven:

import itertools
zip(itertools.repeat(constant), a)

In het geval dat uw functie iets retourneert:

results = pool.starmap(write, zip(a,b))

Dit geeft een lijst met de geretourneerde waarden.


Antwoord 5, autoriteit 14%

Hoe meerdere argumenten te nemen:

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c
if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 
    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)

Antwoord 6, autoriteit 6%

Nadat ik heb geleerd over itertools in J.F. Sebastianantwoord Ik besloot een stap verder te gaan en een parmap-pakket te schrijven dat zorgt voor parallellisatie, met mapen starmapfuncties op python-2.7 en python-3.2 (en later ook) die elk aantalpositionele argumenten kunnen aannemen.

Installatie

pip install parmap

Hoe te parallelliseren:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Ik heb parmap geüpload naar PyPI en naar een github-repository.

Als voorbeeld kan de vraag als volgt worden beantwoord:

import parmap
def harvester(case, text):
    X = case[0]
    text+ str(X)
if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

Antwoord 7, autoriteit 3%

Er is een splitsing van multiprocessinggenaamd pathos(opmerking: gebruik de versie op github) die geen starmapnodig heeft — de kaartfuncties weerspiegelen de API voor de python-kaart, dus de kaart kan meerdere argumenten bevatten. Met pathoskun je over het algemeen ook multiprocessing doen in de interpreter, in plaats van vast te zitten in het __main__-blok. Pathos moet worden uitgebracht, na wat milde updates — meestal conversie naar python 3.x.

 Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathosheeft verschillende manieren waarop je het exacte gedrag van starmapkunt krijgen.

>>> def add(*x):
...   return sum(x)
... 
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>> 

Antwoord 8, autoriteit 2%

Een betere oplossing voor python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

OUT []:

[3, 5, 7]


Antwoord 9, Autoriteit 2%

U kunt de volgende twee functies gebruiken om te voorkomen dat u een wikkel wilt schrijven voor elke nieuwe functie:

import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)
def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Gebruik de functie functionmet de lijsten van argumenten arg_0, arg_1EN arg_2ALS VOLGERS:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

Antwoord 10, Autoriteit 2%

Nog een eenvoudig alternatief is om uw functieparameters in een tuple in te wikkelen en vervolgens de parameters in te wikkelen die ook in tuples moeten worden gepasseerd. Dit is misschien niet ideaal bij het omgaan met grote stukjes gegevens. Ik geloof dat het voor elke tuple kopieën zou maken.

from multiprocessing import Pool
def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d
if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

geeft de uitvoer in een willekeurige volgorde:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

Antwoord 11, Autoriteit 2%

Een betere manier is het gebruik van decorateur in plaats van het schrijven van wrapper functie met de hand. Zeker als je een heleboel functies in kaart brengen, zal decorateur uw tijd te besparen door het vermijden van het schrijven wrapper voor elke functie. Meestal een versierde functie is niet picklable, maar we functoolskan gebruiken om rond te krijgen. Meer disscusions kan worden gevonden hier .

Hier het voorbeeld

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper
@unpack_args
def func(x, y):
    return x + y

Dan kunt u deze kaart te brengen met rits argumenten

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Natuurlijk, kunt u altijd gebruik maken van Pool.starmapPython 3 (& gt; = 3,3). zoals vermeld in andere antwoorden


Antwoord 12

Hier is een andere manier om het te doen dat IMHO is eenvoudiger en eleganter dan een van de andere antwoorden.

Dit programma heeft een functie die twee parameters neemt, print ze uit en drukt ook de som:

import multiprocessing
def main():
    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with
# end function
def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function
if __name__ == '__main__':
    main()

uitvoer is:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Bekijk de Python-documenten voor meer informatie:

https://docs.python.org/3/ library/multiprocessing.html#module-multiprocessing.pool

Bekijk vooral de functie starmap.

Ik gebruik Python 3.6, ik weet niet zeker of dit zal werken met oudere Python-versies

Waarom er niet zo’n duidelijk voorbeeld als dit in de documenten staat, weet ik niet zeker.


Antwoord 13

Een andere manier is om een lijst met lijsten door te geven aan een routine met één argument:

import os
from multiprocessing import Pool
def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Men kan dan een lijst met argumenten maken met de eigen favoriete methode.


Antwoord 14

Vanaf python 3.4.4 kunt u multiprocessing.get_context() gebruiken om een contextobject te verkrijgen om meerdere startmethoden te gebruiken:

import multiprocessing as mp
def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)
if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Of je vervangt gewoon

pool.map(harvester(text,case),case, 1)

door:

pool.apply_async(harvester(text,case),case, 1)

Antwoord 15

In de officiële documentatie staat dat het slechts één itereerbaar argument ondersteunt. In dergelijke gevallen gebruik ik graag apply_async. In jouw geval zou ik het volgende doen:

from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res
def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1
if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

Antwoord 16

Er zijn hier veel antwoorden, maar geen enkele lijkt Python 2/3-compatibele code te bieden die op elke versie werkt. Als u wilt dat uw code gewoon werkt, werkt dit voor beide Python-versies:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

Daarna kun je multiprocessing gebruiken op de reguliere Python 3-manier, zoals je wilt. Bijvoorbeeld:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

werkt in Python 2 of Python 3.


Antwoord 17

text = "test"
def unpack(args):
    return args[0](*args[1:])
def harvester(text, case):
    X = case[0]
    text+ str(X)
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()

Antwoord 18

Dit is een voorbeeld van de routine die ik gebruik om meerdere argumenten door te geven aan een functie met één argument die wordt gebruikt in een pool.imapvork:

from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun
    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()

Antwoord 19

Dit is misschien een andere optie. De truc zit in de functie wrapperdie een andere functie retourneert die wordt doorgegeven aan pool.map. De onderstaande code leest een invoerarray en retourneert voor elk (uniek) element daarin hoe vaak (dwz telt) dat element in de array voorkomt, bijvoorbeeld als de invoer is

np.eye(3) = [ [1. 0. 0.]
              [0. 1. 0.]
              [0. 0. 1.]]

dan verschijnt nul 6 keer en één 3 keer

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count
def extract_counts(label_array):
    labels = np.unique(label_array)
    out = extract_counts_helper([label_array], labels)
    return out
def extract_counts_helper(args, labels):
    n = max(1, cpu_count() - 1)
    pool = ThreadPool(n)
    results = {}
    pool.map(wrapper(args, results), labels)
    pool.close()
    pool.join()
    return results
def wrapper(argsin, results):
    def inner_fun(label):
        label_array = argsin[0]
        counts = get_label_counts(label_array, label)
        results[label] = counts
    return inner_fun
def get_label_counts(label_array, label):
    return sum(label_array.flatten() == label)
if __name__ == "__main__":
    img = np.ones([2,2])
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    img = np.eye(3)
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    img = np.random.randint(5, size=(3, 3))
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")

U moet:

input array: 
 [[1. 1.]
 [1. 1.]]
label counts:  {1.0: 4}
========
input array: 
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
label counts:  {0.0: 6, 1.0: 3}
========
input array: 
 [[4 4 0]
 [2 4 3]
 [2 3 1]]
label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========

Antwoord 20

Bewaar al uw argumenten als een reeks van tuples .

Voorbeeld Zeg normaal gesproken belt u uw functie als

def mainImage(fragCoord : vec2, iResolution : vec3, iTime : float) -> vec3:

Pas in plaats daarvan één tuple en pak de argumenten uit

def mainImage(package_iter) -> vec3: 
    fragCoord=package_iter[0]  
    iResolution=package_iter[1]
    iTime=package_iter[2]

Bouw de tuple op door een lus te gebruiken voor de hand

   package_iter = [] 
    iResolution = vec3(nx,ny,1)
    for j in range( (ny-1), -1, -1):
        for i in range( 0, nx, 1): 
            fragCoord : vec2 = vec2(i,j)
            time_elapsed_seconds = 10
            package_iter.append(  (fragCoord, iResolution, time_elapsed_seconds)  )

voer vervolgens alles uit met behulp van de kaart door de ARRAY van TUPLES door te geven

   array_rgb_values = []
    with concurrent.futures.ProcessPoolExecutor() as executor: 
        for  val in executor.map(mainImage, package_iter):          
            fragColor=val
            ir = clip( int(255* fragColor.r), 0, 255)
            ig = clip(int(255* fragColor.g), 0, 255)
            ib= clip(int(255* fragColor.b), 0, 255)
            array_rgb_values.append( (ir,ig,ib) )

Ik weet dat Python * en ** heeft voor het uitpakken , maar ik heb die nog niet geprobeerd.
Het is ook beter om gelijktijdige futures van een bibliotheek op een hoger niveau te gebruiken dan de multiprocessing-bibliotheek op een laag niveau


Antwoord 21

import time
from multiprocessing import Pool
def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass
if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)

Antwoord 22

voor python2 kun je deze truc gebruiken

def fun(a,b):
    return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))

Other episodes