Hoe parallelliseer ik een eenvoudige Python-loop?

Dit is waarschijnlijk een triviale vraag, maar hoe parallelliseer ik de volgende lus in python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)
    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Ik weet hoe ik enkele threads in Python moet starten, maar ik weet niet hoe ik de resultaten moet “verzamelen”.

Meerdere processen zou ook goed zijn – wat in dit geval het gemakkelijkst is. Ik gebruik momenteel Linux, maar de code zou ook op Windows en Mac moeten draaien.

Wat is de gemakkelijkste manier om deze code te parallelliseren?


Antwoord 1, autoriteit 100%

Het gebruik van meerdere threads op CPython levert geen betere prestaties op voor pure Python-code vanwege de globale interpreter lock (GIL). Ik raad aan om in plaats daarvan de module multiprocessingte gebruiken:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Merk op dat dit niet werkt in de interactieve tolk.

Om de gebruikelijke FUD rond de GIL te vermijden: Het zou sowieso geen enkel voordeel hebben om threads voor dit voorbeeld te gebruiken. Je wiltprocessen hier gebruiken, geen threads, omdat ze een heleboel problemen voorkomen.


Antwoord 2, autoriteit 38%

from joblib import Parallel, delayed
def process(i):
    return i * i
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)

Het bovenstaande werkt prachtig op mijn computer (Ubuntu, pakket joblib was vooraf geïnstalleerd, maar kan worden geïnstalleerd via pip install joblib).

Overgenomen van https://blog.dominodatalab.com/simple-parallelization/


Bewerken op 31 maart 2021: op joblib, multiprocessing, threadingen asyncio

  • joblibin de bovenstaande code gebruikt import multiprocessingonder de motorkap (en dus meerdere processen, wat doorgaans de beste manier is om CPU-werk over kernen uit te voeren – vanwege de GIL)
  • Je kunt joblibmeerdere threads laten gebruiken in plaats van meerdere processen, maar dit (of rechtstreeks import threadinggebruiken) is alleen voordelig als de threads veel tijd besteden aan I/ O (bijv. lezen/schrijven naar schijf, een HTTP-verzoek verzenden). Voor I/O-werk blokkeert de GIL de uitvoering van een andere thread niet
  • Sinds Python 3.7, als alternatief voor threading, kun je parallel werken met asyncio, maar hetzelfde advies is van toepassing als voor import threading(hoewel in tegenstelling tot laatstgenoemde slechts 1 thread wordt gebruikt)
  • Het gebruik van meerdere processen brengt overhead met zich mee. U moet zelf controleren of het bovenstaande codefragment uw muurtijd verbetert. Hier is er nog een, waarvoor ik heb bevestigd dat joblibbetere resultaten oplevert:
import time
from joblib import Parallel, delayed
def countdown(n):
    while n>0:
        n -= 1
    return n
t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro
t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

Antwoord 3, autoriteit 35%

Om een ​​eenvoudige for-lus te parallelliseren, voegt joblibveel waarde toe aan ruw gebruik van multiprocessing. Niet alleen de korte syntaxis, maar ook zaken als transparante opeenhoping van iteraties wanneer ze erg snel zijn (om de overhead te verwijderen) of het vastleggen van de traceback van het onderliggende proces, voor betere foutrapportage.

Disclaimer: ik ben de oorspronkelijke auteur van joblib.


Antwoord 4, autoriteit 28%

Wat is de gemakkelijkste manier om deze code te parallelliseren?

Gebruik een PoolExecutor van concurrent.futures. Vergelijk de originele code hiermee, zij aan zij. Ten eerste is de meest beknopte manier om dit te benaderen met executor.map:

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

of uitgesplitst door elke oproep afzonderlijk in te dienen:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))
    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

Het verlaten van de context geeft de uitvoerder aan om middelen vrij te maken

Je kunt threads of processen gebruiken en exact dezelfde interface gebruiken.

Een werkend voorbeeld

Hier is een werkende voorbeeldcode, die de waarde aantoont van:

Zet dit in een bestand – futuretest.py:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection
def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result
def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result
def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')
def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)
if __name__ == '__main__':
    main()

En hier is de uitvoer voor één run van python -m futuretest:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

Processor-intensieve analyse

Verwacht bij het uitvoeren van processorintensieve berekeningen in Python dat de ProcessPoolExecutorbeter presteert dan de ThreadPoolExecutor.

Vanwege de Global Interpreter Lock (ook wel de GIL genoemd) kunnen threads niet meerdere processors gebruiken, dus verwacht dat de tijd voor elke berekening en de muurtijd (verstreken realtime) groter zal zijn.

IO-gebonden analyse

Aan de andere kant, verwacht bij het uitvoeren van IO-gebonden bewerkingen dat ThreadPoolExecutorbeter presteert dan ProcessPoolExecutor.

Python’s threads zijn echt, OS, threads. Ze kunnen door het besturingssysteem in de sluimerstand worden gezet en weer worden gewekt wanneer hun informatie binnenkomt.

Laatste gedachten

Ik vermoed dat multiprocessing langzamer zal zijn op Windows, aangezien Windows forking niet ondersteunt, dus elk nieuw proces heeft tijd nodig om te starten.

U kunt meerdere threads in meerdere processen nesten, maar het wordt aanbevolen om niet meerdere threads te gebruiken om meerdere processen af ​​te splitsen.

Als je met een zwaar verwerkingsprobleem in Python wordt geconfronteerd, kun je triviaal schalen met extra processen, maar niet zozeer met threading.


Antwoord 5, autoriteit 16%

Dit is de gemakkelijkste manier om het te doen!

U kunt asynciogebruiken. (Documentatie is hierte vinden). Het wordt gebruikt als basis voor meerdere asynchrone Python-frameworks die krachtige netwerk- en webservers, databaseverbindingsbibliotheken, gedistribueerde taakwachtrijen, enz. bieden. Bovendien heeft het zowel API’s op hoog als laag niveau om elk soort probleem op te lossen .

import asyncio
def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
    return wrapped
@background
def your_function(argument):
    #code

Nu zal deze functie parallel worden uitgevoerd wanneer deze wordt aangeroepen zonder het hoofdprogramma in de wachtstand te zetten. Je kunt het ook gebruiken om for loop te parallelliseren. Wanneer een for-lus wordt aangeroepen, is de lus sequentieel, maar elke iteratie loopt parallel aan het hoofdprogramma zodra de interpreter daar aankomt.
Bijvoorbeeld:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))
for i in range(10):
    your_function(i)
print('loop finished')

Dit levert de volgende uitvoer op:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

Antwoord 6, autoriteit 9%

Er zijn een aantal voordelen verbonden aan het gebruik van Ray:

  • U kunt naast meerdere kernen (met dezelfde code) over meerdere machines parallelliseren.
  • Efficiënte verwerking van numerieke gegevens via gedeeld geheugen (en serialisatie zonder kopieën).
  • Hoge taakdoorvoer met gedistribueerde planning.
  • Fouttolerantie.

In jouw geval zou je Ray kunnen starten en een externe functie kunnen definiëren

import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

en roep het dan parallel aan

output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Om hetzelfde voorbeeld op een cluster uit te voeren, zou de enige regel die zou veranderen de aanroep naar ray.init() zijn. De relevante documentatie is hierte vinden.

Merk op dat ik Ray help ontwikkelen.


Antwoord 7, autoriteit 3%

Ik vond jobliberg handig voor mij. Zie het volgende voorbeeld:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1: gebruik alle beschikbare kernen


Antwoord 8, autoriteit 2%

waarom gebruik je geen threads en één mutex om één globale lijst te beschermen?

import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()
for t in threads:
    t.join()
#here you have output list filled with data

Houd er rekening mee dat je zo snel bent als je langzaamste thread


Antwoord 9

Stel dat we een asynchrone functie hebben

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

Dat moet op een grote array worden uitgevoerd. Sommige attributen worden doorgegeven aan het programma en sommige worden gebruikt vanuit de eigenschap van het dictionary-element in de array.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

Antwoord 10

bedankt @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
    return x + 1
if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

Antwoord 11

Dit kan handig zijn bij het implementeren van multiprocessing en parallel/distributed computing in Python.

YouTube-zelfstudie over het gebruik van het Techila-pakket

Techila is een gedistribueerde computermiddleware die rechtstreeks met Python kan worden geïntegreerd met behulp van het techila-pakket. De perzikfunctie in het pakket kan handig zijn bij het parallelliseren van lusstructuren. (Het volgende codefragment is afkomstig van de Techila-communityforums)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

Antwoord 12

Dask-futures; Het verbaast me dat nog niemand het heeft genoemd. . .

from dask.distributed import Client
client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)
def my_function(i):
    output = <code to execute in the for loop here>
    return output
futures = []
for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)
results = client.gather(futures)
client.close()

Antwoord 13

Kijk hier eens naar;

http://docs.python.org/library/queue.html

Dit is misschien niet de juiste manier om het te doen, maar ik zou iets doen als;

Echte code;

from multiprocessing import Process, JoinableQueue as Queue 
class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Hopelijk helpt dat.


Antwoord 14

De gelijktijdige-wrappers door de tqdm-bibliotheekzijn een leuke manier om langerlopende code te parallelliseren. tqdm geeft feedback over de huidige voortgang en resterende tijd via een slimme voortgangsmeter, wat ik erg handig vind voor lange berekeningen.

Lussen kunnen worden herschreven om als gelijktijdige threads te worden uitgevoerd door een eenvoudige aanroep van thread_map, of als gelijktijdige multi-processen door een eenvoudige aanroep van process_map:

from tqdm.contrib.concurrent import thread_map, process_map
def calc_stuff(num, multiplier):
    import time
    time.sleep(1)
    return num, num * multiplier
if __name__ == "__main__":
    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]
    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)
    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)
    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

Antwoord 15

een heel eenvoudig voorbeeld van parallelle verwerking is

from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)
        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)
if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

Other episodes