Hoe parallel programmeren in Python?

Voor C++ kunnen we OpenMP gebruiken om parallel te programmeren; OpenMP werkt echter niet voor Python. Wat moet ik doen als ik sommige delen van mijn Python-programma wil parallellen?

De structuur van de code kan worden beschouwd als:

solve1(A)
solve2(B)

Waar solve1en solve2twee onafhankelijke functies zijn. Hoe dit soort code parallel in plaats van achter elkaar uit te voeren om de looptijd te verkorten?
De code is:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4
    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)
        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
        node1 = partition[0]
        node2 = partition[1]
        G = updateGraph(G, node1, node2)
        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Waar setinneren setoutertwee onafhankelijke functies zijn. Dat is waar ik parallel wil lopen…


Antwoord 1, autoriteit 100%

U kunt de module multiprocessinggebruiken. Voor dit geval zou ik een verwerkingspool kunnen gebruiken:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Hierdoor ontstaan processen die generiek werk voor u kunnen doen. Aangezien we processesniet hebben doorgegeven, zal het één proces voortbrengen voor elke CPU-kern op uw machine. Elke CPU-kern kan één proces tegelijk uitvoeren.

Als u een lijst wilt toewijzen aan een enkele functie, doet u dit:

args = [A, B]
results = pool.map(solve1, args)

Gebruik geen threads omdat de GILalle bewerkingen op python-objecten vergrendelt.


Antwoord 2, autoriteit 21%

Dit kan heel elegant worden gedaan met Ray.

Om uw voorbeeld parallel te laten lopen, moet u uw functies definiëren met de @ray.remote-decorator en ze vervolgens aanroepen met .remote.

import ray
ray.init()
# Define the functions.
@ray.remote
def solve1(a):
    return 1
@ray.remote
def solve2(b):
    return 2
# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)
# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

Dit heeft een aantal voordelen ten opzichte van de multiprocessing-module.

  1. Dezelfde code zal zowel op een multicore-machine als op een cluster van machines worden uitgevoerd.
  2. Processen delen gegevens efficiënt via gedeeld geheugen en serialisatie zonder kopiëren.
  3. Foutberichten worden netjes verspreid.
  4. Deze functieaanroepen kunnen samen worden samengesteld, bijv.

    @ray.remote
    def f(x):
        return x + 1
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. Naast het aanroepen van functies op afstand kunnen klassen op afstand worden geïnstantieerd als acteurs .

Merk op dat ray een raamwerk is dat ik heb geholpen.


3, Autoriteit 3%

CPYTHON gebruikt het Global Tolketer-slot dat parallel programmeert, een beetje interessanter dan C++

Dit onderwerp heeft verschillende nuttige voorbeelden en beschrijvingen van de uitdaging:

Python Global Tollow Lock ( Gil) Workaround op multi-core-systemen met behulp van Tasket op Linux?


4, Autoriteit 3%

De oplossing, zoals anderen hebben gezegd, is om meerdere processen te gebruiken. Welk kader is echter geschikt, hangt echter van vele factoren af. Naast de al genoemde, is er ook charm4py en MPI4PY (ik ben de ontwikkelaar van Charm4py).

Er is een efficiëntere manier om het bovenstaande voorbeeld te implementeren dan het gebruik van de zwembadonttrekking van de werknemer. De hoofdlus verzendt dezelfde parameters (inclusief de volledige grafiek G) en naar werknemers in elk van de 1000 iteraties. Aangezien ten minste één werknemer op een ander proces zal verblijven, houdt dit in het kopiëren en verzenden van de argumenten naar het andere proces (ES). Dit kan erg duur zijn, afhankelijk van de grootte van de objecten. In plaats daarvan is het logisch om de staat van werknemers op te staan ​​en eenvoudig de bijgewerkte informatie te verzenden.

Bijvoorbeeld, in Charm4py kan dit zo worden gedaan:

class Worker(Chare):
    def __init__(self, Q, G, n):
        self.G = G
        ...
    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...
def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B
        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Merk op dat we voor dit voorbeeld echt slechts één werknemer nodig hebben. De hoofdlus kan een van de functies uitvoeren en laat de werknemer de andere uitvoeren. Maar mijn code helpt om een ​​paar dingen te illustreren:

  1. werknemer a runs in proces 0 (hetzelfde als de hoofdlus). Terwijl result_a.get()is geblokkeerd op het resultaat, werknemer A doet de berekening in hetzelfde proces.
  2. Argumenten worden automatisch doorgegeven aan de hand van werknemer A, omdat het in hetzelfde is
    proces (er is geen kopiëren bij betrokken).

5

U kunt joblibBibliotheek gebruiken om parallelle berekening en multiprocessing te doen.

from joblib import Parallel, delayed

U kunt eenvoudig een functie maken foowaarmee u parallel wilt worden uitgevoerd en op basis van het volgende stuk code implementeer parallelle verwerking:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Waar num_coreskan worden verkregen van multiprocessingBibliotheek zoals gevolgd:

import multiprocessing
num_cores = multiprocessing.cpu_count()

Als u een functie hebt met meer dan één ingangsargument, en u gewoon over een van de argumenten door een lijst wilt herhalen, kunt u de partialFunctie van functoolsBibliotheek als volgt:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

U kunt een volledige uitleg vinden van de Python en R Multiprocessing met een paar voorbeelden hier .

Other episodes