Hoog geheugengebruik met Python Multiprocessing

Ik heb een aantal berichten gezien over geheugengebruik met de Python Multiprocessing-module. De vragen lijken echter geen antwoord te geven op het probleem dat ik hier heb. Ik post mijn analyse in de hoop dat iemand me kan helpen.

Probleem

Ik gebruik multiprocessing om taken parallel uit te voeren en ik heb gemerkt dat het geheugengebruik door de werkprocessen oneindig toeneemt. Ik heb een klein op zichzelf staand voorbeeld dat zou moeten repliceren wat ik opmerk.

import multiprocessing as mp
import time
def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s
if __name__ == "__main__":
    pool = mp.Pool(processes=2)
    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:    
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)

Systeem

Ik gebruik Windows en ik gebruik Taakbeheer om het geheugengebruik te controleren. Ik gebruik Python 2.7.6.

Observatie

Ik heb het geheugenverbruik van de twee werkprocessen hieronder samengevat.

+---------------+----------------------+----------------------+
|  num_tasks    |  memory with del     | memory without del   |
|               | proc_1   | proc_2    | proc_1   | proc_2    |
+---------------+----------------------+----------------------+
| 1000          | 4884     | 4694      | 4892     | 4952      |
| 5000          | 5588     | 5596      | 6140     | 6268      |
| 10000         | 6528     | 6580      | 6640     | 6644      |
+---------------+----------------------+----------------------+

In de bovenstaande tabel heb ik geprobeerd het aantal taken te wijzigen en het geheugengebruik te observeren aan het einde van alle berekeningen en voordat ik join-aankwam bij de Pool. De ‘del’ en ‘without del’ opties zijn of ik commentaar of commentaar op de del lregel binnen de calculate(num)functie respectievelijk verwijder. Vóór berekening is het geheugenverbruik ongeveer 4400.

  1. Het lijkt erop dat het handmatig wissen van de lijsten resulteert in een lager geheugengebruik voor de werkprocessen. Ik dacht dat de vuilnisman dit wel zou regelen. Is er een manier om afvalinzameling te forceren?
  2. Het is een raadsel dat met een toename van het aantal taken, het geheugengebruik in beide gevallen blijft groeien. Is er een manier om het geheugengebruik te beperken?

Ik heb een proces dat op dit voorbeeld is gebaseerd en bedoeld is om op lange termijn te draaien. Ik merk dat deze werkprocessen veel geheugen (~ 4 GB) in beslag nemen na een nachtelijke run. Een joindoen om geheugen vrij te maken is geen optie en ik probeer een manier te vinden zonder join-ing.

Dit lijkt een beetje mysterieus. Heeft iemand iets soortgelijks tegengekomen? Hoe kan ik dit probleem oplossen?


Antwoord 1, autoriteit 100%

Ik heb veel onderzoek gedaan en kon geen oplossing vinden om het probleem op zich op te lossen. Maar er is een fatsoenlijke manier om het geheugenverlies te voorkomen voor een kleine prijs, vooral de langlopende code aan de serverzijde.

De oplossing was in wezen om individuele werkprocessen opnieuw te starten na een vast aantal taken. De klasse Poolin python neemt maxtasksperchildals argument. U kunt maxtasksperchild=1000specificeren, waardoor u de 1000 taken beperkt die op elk onderliggend proces moeten worden uitgevoerd. Nadat het maxtasksperchild-nummer is bereikt, vernieuwt de pool de onderliggende processen. Door een voorzichtig aantal voor maximale taken te gebruiken, kan men het maximale geheugen dat wordt verbruikt, in evenwicht brengen met de opstartkosten die gepaard gaan met het opnieuw opstarten van het back-endproces. De constructie van Poolis als volgt:

pool = mp.Pool(processes=2,maxtasksperchild=1000)

Ik plaats hier mijn volledige oplossing zodat anderen er iets aan kunnen hebben!

import multiprocessing as mp
import time
def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s
if __name__ == "__main__":
    # fix is in the following line #
    pool = mp.Pool(processes=2,maxtasksperchild=1000)
    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:    
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)

Antwoord 2

Een potentieel probleem hier is dat resultaten in willekeurige volgorde kunnen terugkomen, maar omdat je ze in volgorde leest, moet het alle resultaten die terugkomen van de processen in het geheugen opslaan. Hoe hoger num_tasks, hoe meer resultaten het mogelijk in het geheugen moet opslaan, wachtend op uw for f in taken-lus om het resultaat te verwerken.

In het ergste geval worden de resultaten in exact omgekeerde volgorde berekend. In dat geval moeten alle resultaten door de multiprocessing-module voor u in het geheugen worden bewaard voordat uw for f in taken-lus iets gaat verwerken.

Het lijkt er echter op dat de hoeveelheid geheugen die ze gebruiken hoger is dan ik in dit geval zou verwachten (meer dan het zou moeten zijn alleen voor het opslaan van de 1000-10000 getallen die worden geretourneerd door de functie berekenen()), maar misschien is er gewoon een hoge constante overhead per werknemer resultaat dat wordt opgeslagen.

Heb je geprobeerd de parameter callbackop te geven voor apply_async, zodat je resultaten onmiddellijk kunt verwerken als ze zijn voltooid, of met imap_unordered, zodat het u resultaten kan geven zodra ze klaar zijn?

Other episodes