Hoe multiprocessing-wachtrij gebruiken in Python?

Ik heb veel moeite om te begrijpen hoe de multiprocessing-wachtrij werkt op python en hoe deze te implementeren. Laten we zeggen dat ik twee python-modules heb die toegang hebben tot gegevens uit een gedeeld bestand, laten we deze twee modules een schrijver en een lezer noemen. Mijn plan is om zowel de lezer als de schrijver verzoeken in twee aparte multiprocessing-wachtrijen te laten plaatsen, en dan een derde proces deze verzoeken in een lus te laten plaatsen en als zodanig uit te voeren.

Mijn grootste probleem is dat ik echt niet weet hoe ik multiprocessing.queue correct moet implementeren, je kunt het object niet echt voor elk proces instantiëren, omdat het afzonderlijke wachtrijen zijn, hoe zorg je ervoor dat alle processen betrekking hebben op een gedeelde wachtrij (of in dit geval wachtrijen)


Antwoord 1, autoriteit 100%

Mijn grootste probleem is dat ik echt niet weet hoe ik multiprocessing.queue correct moet implementeren, je kunt het object niet echt voor elk proces instantiëren, omdat het afzonderlijke wachtrijen zijn, hoe zorg je ervoor dat alle processen betrekking hebben op een gedeelde wachtrij (of in dit geval wachtrijen)

Dit is een eenvoudig voorbeeld van een lezer en schrijver die een enkele wachtrij delen… De schrijver stuurt een aantal gehele getallen naar de lezer; wanneer de schrijver geen nummers meer heeft, stuurt hij ‘KLAAR’, waardoor de lezer weet dat hij uit de leeslus moet breken.

from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break
def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')
if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process
        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

Antwoord 2, autoriteit 9%

Hier is een doodeenvoudig gebruik van multiprocessing.Queueen multiprocessing.Processwaarmee bellers een “event” plus argumenten naar een apart proces kunnen sturen dat de gebeurtenis naar een “do_” -methode op het proces. (Python 3.4+)

import multiprocessing as mp
import collections
Msg = collections.namedtuple('Msg', ['event', 'args'])
class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()
    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)
    def dispatch(self, msg):
        event, args = msg
        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)
        handler(*args)
    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

Gebruik:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)
if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

De sendgebeurt in het bovenliggende proces, de do_*gebeurt in het onderliggende proces.

Ik heb elke uitzonderingsbehandeling weggelaten die duidelijk de run-lus zou onderbreken en het onderliggende proces zou verlaten. Je kunt het ook aanpassen door runte negeren om blokkering of wat dan ook te regelen.

Dit is eigenlijk alleen nuttig in situaties waarin je een enkel werkproces hebt, maar ik denk dat het een relevant antwoord op deze vraag is om een veelvoorkomend scenario te demonstreren met een beetje meer objectoriëntatie.


Antwoord 3, autoriteit 7%

Ik heb gekeken naar meerdere antwoorden op stapeloverloop en op internet terwijl ik probeerde een manier te bedenken om multiprocessing uit te voeren met behulp van wachtrijen voor het doorgeven van grote panda-dataframes. Het leek me dat elk antwoord hetzelfde soort oplossingen herhaalde, zonder rekening te houden met de veelheid aan randgevallen die je zeker tegenkomt bij het opzetten van berekeningen als deze. Het probleem is dat er veel dingen tegelijk spelen. Het aantal taken, het aantal werknemers, de duur van elke taak en mogelijke uitzonderingen tijdens de uitvoering van de taak. Dit alles maakt synchronisatie lastig en de meeste antwoorden gaan niet in op hoe u dit kunt doen. Dus dit is mijn mening na een paar uur gepruts, hopelijk is dit generiek genoeg voor de meeste mensen om het nuttig te vinden.

Enkele gedachten voorafgaand aan codeervoorbeelden. Aangezien queue.Emptyof queue.qsize()of een andere gelijkaardige methode onbetrouwbaar is voor flow control, kan elke dergelijke code

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break

is nep. Dit zal de werknemer doden, zelfs als milliseconden later een andere taak in de wachtrij verschijnt. De werker zal niet herstellen en na een tijdje zullen ALLE werkers verdwijnen omdat ze willekeurig de wachtrij tijdelijk leeg vinden. Het eindresultaat zal zijn dat de belangrijkste multiprocessing-functie (die met de join() op de processen) terugkeert zonder dat alle taken zijn voltooid. Mooi hoor. Veel succes met debuggen als je duizenden taken hebt en er een paar ontbreken.

Het andere probleem is het gebruik van schildwachtwaarden. Veel mensen hebben voorgesteld om een schildwachtwaarde aan de wachtrij toe te voegen om het einde van de wachtrij te markeren. Maar om het te markeren aan wie precies? Als er N werkers zijn, ervan uitgaande dat N het aantal beschikbare kernen is voor geven of nemen, dan zal een enkele schildwachtwaarde het einde van de wachtrij alleen markeren voor één werker. Alle andere arbeiders zullen wachten op meer werk als er geen meer is. Typische voorbeelden die ik heb gezien zijn

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break

Eén werknemer krijgt de schildwachtwaarde terwijl de rest voor onbepaalde tijd wacht. Geen enkel bericht dat ik tegenkwam vermeldde dat je de schildwachtwaarde TEN MINSTE zo vaak als je werknemers hebt in de wachtrij moet plaatsen, zodat ze het ALLEMAAL krijgen.

Het andere probleem is de afhandeling van uitzonderingen tijdens het uitvoeren van taken. Nogmaals, deze moeten worden gevangen en beheerd. Bovendien, als je een wachtrij completed_taskshebt, moet je onafhankelijk op een deterministische manier tellen hoeveel items er in de wachtrij staan voordat je besluit dat de taak is voltooid. Opnieuw vertrouwen op wachtrijgroottes is gedoemd te mislukken en levert onverwachte resultaten op.

In het onderstaande voorbeeld zal de functie par_proc()een lijst met taken ontvangen, inclusief de functies waarmee deze taken moeten worden uitgevoerd naast eventuele benoemde argumenten en waarden.

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil
SENTINEL = None
def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name
    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break
                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})
def par_proc(job_list, num_cpus=None):
    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)
    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))
    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()
    # Gather processes and results here
    processes = []
    results = []
    # Count tasks
    num_tasks = 0
    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)
    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus
    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)
    print('* Number of tasks: {}'.format(num_tasks))
    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()
    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
    for p in processes:
        p.join()
    return results

En hier is een test om de bovenstaande code tegen uit te voeren

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3
    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3
    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]
    results = par_proc(task_list)
    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
    assert job1 == 15
    assert job2 == 21

plus nog een met enkele uitzonderingen

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3
    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3
    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]
    results = par_proc(task_list)
    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
    assert not job1
    assert job2 == 21

Hopelijk is dat nuttig.


Antwoord 4, autoriteit 6%

in “from queue import Queue” is er geen module genaamd queue, in plaats daarvan moet multiprocessingworden gebruikt. Daarom zou het eruit moeten zien als “from multiprocessing import Queue


Antwoord 5, autoriteit 3%

Zojuist een eenvoudig en algemeen voorbeeld gemaakt om te demonstreren dat een bericht via een wachtrij wordt doorgegeven tussen 2 zelfstandige programma’s. Het beantwoordt niet direct de vraag van de OP, maar moet duidelijk genoeg zijn om het concept aan te geven.

Server:

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union
class QueueManager(multiprocessing.managers.BaseManager):
    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass
def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
    global q
    if not ident in q:
        q[ident] = multiprocessing.Queue()
    return q[ident]
q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')
def init_queue_manager_server():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue', get_queue)
def serve(no: int, term_ev: threading.Event):
    manager: QueueManager
    with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
        print(f"Server address {no}: {manager.address}")
        while not term_ev.is_set():
            try:
                item: Any = manager.get_queue().get(timeout=0.1)
                print(f"Client {no}: {item} from {manager.address}")
            except queue.Empty:
                continue
async def main(n: int):
    init_queue_manager_server()
    term_ev: threading.Event = threading.Event()
    executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()
    i: int
    for i in range(n):
        asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))
    # Gracefully shut down
    try:
        await asyncio.get_running_loop().create_future()
    except asyncio.CancelledError:
        term_ev.set()
        executor.shutdown()
        raise
if __name__ == '__main__':
    asyncio.run(main(int(sys.argv[1])))

Klant:

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union
class QueueManager(multiprocessing.managers.BaseManager):
    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass
delattr(QueueManager, 'get_queue')
def init_queue_manager_client():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue')
def main():
    init_queue_manager_client()
    manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
    manager.connect()
    message = f"A message from {os.getpid()}"
    print(f"Message to send: {message}")
    manager.get_queue().put(message)
if __name__ == '__main__':
    main()

Gebruik

Server:

$ python3 multiprocessing-queue-manager-server.py N

Nis een geheel getal dat aangeeft hoeveel servers moeten worden gemaakt. Kopieer een van de <server-address-N>-uitvoer van de server en maak dit het eerste argument van elke multiprocessing-queue-manager-client.py.

Klant:

python3 multiprocessing-queue-manager-client.py <server-address-1>

Resultaat

Server:

Client 1: <item> from <server-address-1>

Samenvatting: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD: hiereen pakket gemaakt.

Server:

import ipcq
with ipcq.QueueManagerServer(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO) as server:
    server.get_queue().get()

Klant:

import ipcq
client = ipcq.QueueManagerClient(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO)
client.get_queue().put('a message')

voer hier de afbeeldingsbeschrijving in


Antwoord 6

We hebben twee versies hiervan geïmplementeerd, één een eenvoudige multi threadpool die vele soorten callables kan uitvoeren, wat ons leven veel gemakkelijker maakt, en de tweede versie die gebruik maakt van processen, wat minder flexibel is qua callables en extra call to dille vereist.

Als frozen_pool op true wordt ingesteld, wordt de uitvoering bevriezen totdat finish_pool_queue in een van beide klassen wordt aangeroepen.

Draadversie:

'''
Created on Nov 4, 2019
@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
class ThreadPool(object):
    def __init__(self, queue_threads, *args, **kwargs):
        self.frozen_pool = kwargs.get('frozen_pool', False)
        self.print_queue = kwargs.get('print_queue', True)
        self.pool_results = []
        self.lock = Lock()
        self.queue_threads = queue_threads
        self.queue = Queue()
        self.threads = []
        for i in range(self.queue_threads):
            t = Thread(target=self.make_pool_call)
            t.daemon = True
            t.start()
            self.threads.append(t)
    def make_pool_call(self):
        while True:
            if self.frozen_pool:
                #print '--> Queue is frozen'
                sleep(1)
                continue
            item = self.queue.get()
            if item is None:
                break
            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)
            try:
                result = call(*args, **kwargs)
                if keep_results:
                    self.lock.acquire()
                    self.pool_results.append((item, result))
                    self.lock.release()
            except Exception as e:
                self.lock.acquire()
                print e
                traceback.print_exc()
                self.lock.release()
                os.kill(os.getpid(), signal.SIGUSR1)
            self.queue.task_done()
    def finish_pool_queue(self):
        self.frozen_pool = False
        while self.queue.unfinished_tasks > 0:
            if self.print_queue:
                print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
            sleep(5)
        self.queue.join()
        for i in range(self.queue_threads):
            self.queue.put(None)
        for t in self.threads:
            t.join()
        del self.threads[:]
    def get_pool_results(self):
        return self.pool_results
    def clear_pool_results(self):
        del self.pool_results[:]

Procesversie:

 '''
Created on Nov 4, 2019
@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
    RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc
class ProcessPool(object):
    def __init__(self, queue_processes, *args, **kwargs):
        self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
        self.print_queue = kwargs.get('print_queue', True)
        self.manager = Manager()
        self.pool_results = self.manager.list()
        self.queue_processes = queue_processes
        self.queue = JoinableQueue()
        self.processes = []
        for i in range(self.queue_processes):
            p = Process(target=self.make_pool_call)
            p.start()
            self.processes.append(p)
        print 'Processes', self.queue_processes
    def make_pool_call(self):
        while True:
            if self.frozen_pool.value:
                sleep(1)
                continue
            item_pickled = self.queue.get()
            if item_pickled is None:
                #print '--> Ending'
                self.queue.task_done()
                break
            item = dill.loads(item_pickled)
            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)
            try:
                result = call(*args, **kwargs)
                if keep_results:
                    self.pool_results.append(dill.dumps((item, result)))
                else:
                    del call, args, kwargs, keep_results, item, result
            except Exception as e:
                print e
                traceback.print_exc()
                os.kill(os.getpid(), signal.SIGUSR1)
            self.queue.task_done()
    def finish_pool_queue(self, callable=None):
        self.frozen_pool.value = False
        while self.queue._unfinished_tasks.get_value() > 0:
            if self.print_queue:
                print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))
            if callable:
                callable()
            sleep(5)
        for i in range(self.queue_processes):
            self.queue.put(None)
        self.queue.join()
        self.queue.close()
        for p in self.processes:
            with ignore_exception: p.join(10)
            with ignore_exception: p.terminate()
        with ignore_exception: del self.processes[:]
    def get_pool_results(self):
        return self.pool_results
    def clear_pool_results(self):
        del self.pool_results[:]
def test(eg):
        print 'EG', eg

Bel met een van beide:

tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()

of

pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()

Antwoord 7

Een voorbeeld van meerdere producenten en meerdere consumenten, geverifieerd. Het zou gemakkelijk moeten zijn om het aan te passen voor andere gevallen, single/multi-producenten, single/multi-consumenten.

from multiprocessing import Process, JoinableQueue
import time
import os
q = JoinableQueue()
def producer():
    for item in range(30):
        time.sleep(2)
        q.put(item)
    pid = os.getpid()
    print(f'producer {pid} done')
def worker():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')
        q.task_done()
for i in range(5):
    p = Process(target=worker, daemon=True).start()
# send thirty task requests to the worker
producers = []
for i in range(2):
    p = Process(target=producer)
    producers.append(p)
    p.start()
# make sure producers done
for p in producers:
    p.join()
# block until all workers are done
q.join()
print('All work completed')

Uitleg:

  1. Twee producenten en vijf consumenten in dit voorbeeld.
  2. JoinableQueue wordt gebruikt om ervoor te zorgen dat alle elementen die in de wachtrij zijn opgeslagen, worden verwerkt. ‘task_done’ is voor de werknemer om te melden dat een element is voltooid. ‘q.join()’ wacht op alle elementen die als gereed zijn gemarkeerd.
  3. Met #2 is het niet nodig om op elke werknemer te wachten.
  4. Maar het is belangrijk om mee te doen en te wachten tot elke producent het element in de wachtrij opslaat. Anders sluit u het programma onmiddellijk af.

Other episodes