Ik heb veel moeite om te begrijpen hoe de multiprocessing-wachtrij werkt op python en hoe deze te implementeren. Laten we zeggen dat ik twee python-modules heb die toegang hebben tot gegevens uit een gedeeld bestand, laten we deze twee modules een schrijver en een lezer noemen. Mijn plan is om zowel de lezer als de schrijver verzoeken in twee aparte multiprocessing-wachtrijen te laten plaatsen, en dan een derde proces deze verzoeken in een lus te laten plaatsen en als zodanig uit te voeren.
Mijn grootste probleem is dat ik echt niet weet hoe ik multiprocessing.queue correct moet implementeren, je kunt het object niet echt voor elk proces instantiëren, omdat het afzonderlijke wachtrijen zijn, hoe zorg je ervoor dat alle processen betrekking hebben op een gedeelde wachtrij (of in dit geval wachtrijen)
Antwoord 1, autoriteit 100%
Mijn grootste probleem is dat ik echt niet weet hoe ik multiprocessing.queue correct moet implementeren, je kunt het object niet echt voor elk proces instantiëren, omdat het afzonderlijke wachtrijen zijn, hoe zorg je ervoor dat alle processen betrekking hebben op een gedeelde wachtrij (of in dit geval wachtrijen)
Dit is een eenvoudig voorbeeld van een lezer en schrijver die een enkele wachtrij delen… De schrijver stuurt een aantal gehele getallen naar de lezer; wanneer de schrijver geen nummers meer heeft, stuurt hij ‘KLAAR’, waardoor de lezer weet dat hij uit de leeslus moet breken.
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
Antwoord 2, autoriteit 9%
Hier is een doodeenvoudig gebruik van multiprocessing.Queue
en multiprocessing.Process
waarmee bellers een “event” plus argumenten naar een apart proces kunnen sturen dat de gebeurtenis naar een “do_” -methode op het proces. (Python 3.4+)
import multiprocessing as mp
import collections
Msg = collections.namedtuple('Msg', ['event', 'args'])
class BaseProcess(mp.Process):
"""A process backed by an internal queue for simple one-way message passing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = mp.Queue()
def send(self, event, *args):
"""Puts the event and args as a `Msg` on the queue
"""
msg = Msg(event, args)
self.queue.put(msg)
def dispatch(self, msg):
event, args = msg
handler = getattr(self, "do_%s" % event, None)
if not handler:
raise NotImplementedError("Process has no handler for [%s]" % event)
handler(*args)
def run(self):
while True:
msg = self.queue.get()
self.dispatch(msg)
Gebruik:
class MyProcess(BaseProcess):
def do_helloworld(self, arg1, arg2):
print(arg1, arg2)
if __name__ == "__main__":
process = MyProcess()
process.start()
process.send('helloworld', 'hello', 'world')
De send
gebeurt in het bovenliggende proces, de do_*
gebeurt in het onderliggende proces.
Ik heb elke uitzonderingsbehandeling weggelaten die duidelijk de run-lus zou onderbreken en het onderliggende proces zou verlaten. Je kunt het ook aanpassen door run
te negeren om blokkering of wat dan ook te regelen.
Dit is eigenlijk alleen nuttig in situaties waarin je een enkel werkproces hebt, maar ik denk dat het een relevant antwoord op deze vraag is om een veelvoorkomend scenario te demonstreren met een beetje meer objectoriëntatie.
Antwoord 3, autoriteit 7%
Ik heb gekeken naar meerdere antwoorden op stapeloverloop en op internet terwijl ik probeerde een manier te bedenken om multiprocessing uit te voeren met behulp van wachtrijen voor het doorgeven van grote panda-dataframes. Het leek me dat elk antwoord hetzelfde soort oplossingen herhaalde, zonder rekening te houden met de veelheid aan randgevallen die je zeker tegenkomt bij het opzetten van berekeningen als deze. Het probleem is dat er veel dingen tegelijk spelen. Het aantal taken, het aantal werknemers, de duur van elke taak en mogelijke uitzonderingen tijdens de uitvoering van de taak. Dit alles maakt synchronisatie lastig en de meeste antwoorden gaan niet in op hoe u dit kunt doen. Dus dit is mijn mening na een paar uur gepruts, hopelijk is dit generiek genoeg voor de meeste mensen om het nuttig te vinden.
Enkele gedachten voorafgaand aan codeervoorbeelden. Aangezien queue.Empty
of queue.qsize()
of een andere gelijkaardige methode onbetrouwbaar is voor flow control, kan elke dergelijke code
while True:
try:
task = pending_queue.get_nowait()
except queue.Empty:
break
is nep. Dit zal de werknemer doden, zelfs als milliseconden later een andere taak in de wachtrij verschijnt. De werker zal niet herstellen en na een tijdje zullen ALLE werkers verdwijnen omdat ze willekeurig de wachtrij tijdelijk leeg vinden. Het eindresultaat zal zijn dat de belangrijkste multiprocessing-functie (die met de join() op de processen) terugkeert zonder dat alle taken zijn voltooid. Mooi hoor. Veel succes met debuggen als je duizenden taken hebt en er een paar ontbreken.
Het andere probleem is het gebruik van schildwachtwaarden. Veel mensen hebben voorgesteld om een schildwachtwaarde aan de wachtrij toe te voegen om het einde van de wachtrij te markeren. Maar om het te markeren aan wie precies? Als er N werkers zijn, ervan uitgaande dat N het aantal beschikbare kernen is voor geven of nemen, dan zal een enkele schildwachtwaarde het einde van de wachtrij alleen markeren voor één werker. Alle andere arbeiders zullen wachten op meer werk als er geen meer is. Typische voorbeelden die ik heb gezien zijn
while True:
task = pending_queue.get()
if task == SOME_SENTINEL_VALUE:
break
Eén werknemer krijgt de schildwachtwaarde terwijl de rest voor onbepaalde tijd wacht. Geen enkel bericht dat ik tegenkwam vermeldde dat je de schildwachtwaarde TEN MINSTE zo vaak als je werknemers hebt in de wachtrij moet plaatsen, zodat ze het ALLEMAAL krijgen.
Het andere probleem is de afhandeling van uitzonderingen tijdens het uitvoeren van taken. Nogmaals, deze moeten worden gevangen en beheerd. Bovendien, als je een wachtrij completed_tasks
hebt, moet je onafhankelijk op een deterministische manier tellen hoeveel items er in de wachtrij staan voordat je besluit dat de taak is voltooid. Opnieuw vertrouwen op wachtrijgroottes is gedoemd te mislukken en levert onverwachte resultaten op.
In het onderstaande voorbeeld zal de functie par_proc()
een lijst met taken ontvangen, inclusief de functies waarmee deze taken moeten worden uitgevoerd naast eventuele benoemde argumenten en waarden.
import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil
SENTINEL = None
def do_work(tasks_pending, tasks_completed):
# Get the current worker's name
worker_name = mp.current_process().name
while True:
try:
task = tasks_pending.get_nowait()
except queue.Empty:
print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
time.sleep(0.01)
else:
try:
if task == SENTINEL:
print(worker_name + ' no more work left to be done. Exiting...')
break
print(worker_name + ' received some work... ')
time_start = time.perf_counter()
work_func = pickle.loads(task['func'])
result = work_func(**task['task'])
tasks_completed.put({work_func.__name__: result})
time_end = time.perf_counter() - time_start
print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
except Exception as e:
print(worker_name + ' task failed. ' + str(e))
tasks_completed.put({work_func.__name__: None})
def par_proc(job_list, num_cpus=None):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Use as many workers as there are cores (usually chokes the system so better use less)
num_workers = num_cpus
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
for p in processes:
p.join()
return results
En hier is een test om de bovenstaande code tegen uit te voeren
def test_parallel_processing():
def heavy_duty1(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert job1 == 15
assert job2 == 21
plus nog een met enkele uitzonderingen
def test_parallel_processing_exceptions():
def heavy_duty1_raises(arg1, arg2, arg3):
raise ValueError('Exception raised')
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert not job1
assert job2 == 21
Hopelijk is dat nuttig.
Antwoord 4, autoriteit 6%
in “from queue import Queue
” is er geen module genaamd queue
, in plaats daarvan moet multiprocessing
worden gebruikt. Daarom zou het eruit moeten zien als “from multiprocessing import Queue
“
Antwoord 5, autoriteit 3%
Zojuist een eenvoudig en algemeen voorbeeld gemaakt om te demonstreren dat een bericht via een wachtrij wordt doorgegeven tussen 2 zelfstandige programma’s. Het beantwoordt niet direct de vraag van de OP, maar moet duidelijk genoeg zijn om het concept aan te geven.
Server:
multiprocessing-queue-manager-server.py
import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union
class QueueManager(multiprocessing.managers.BaseManager):
def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
pass
def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
global q
if not ident in q:
q[ident] = multiprocessing.Queue()
return q[ident]
q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')
def init_queue_manager_server():
if not hasattr(QueueManager, 'get_queue'):
QueueManager.register('get_queue', get_queue)
def serve(no: int, term_ev: threading.Event):
manager: QueueManager
with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
print(f"Server address {no}: {manager.address}")
while not term_ev.is_set():
try:
item: Any = manager.get_queue().get(timeout=0.1)
print(f"Client {no}: {item} from {manager.address}")
except queue.Empty:
continue
async def main(n: int):
init_queue_manager_server()
term_ev: threading.Event = threading.Event()
executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()
i: int
for i in range(n):
asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))
# Gracefully shut down
try:
await asyncio.get_running_loop().create_future()
except asyncio.CancelledError:
term_ev.set()
executor.shutdown()
raise
if __name__ == '__main__':
asyncio.run(main(int(sys.argv[1])))
Klant:
multiprocessing-queue-manager-client.py
import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union
class QueueManager(multiprocessing.managers.BaseManager):
def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
pass
delattr(QueueManager, 'get_queue')
def init_queue_manager_client():
if not hasattr(QueueManager, 'get_queue'):
QueueManager.register('get_queue')
def main():
init_queue_manager_client()
manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
manager.connect()
message = f"A message from {os.getpid()}"
print(f"Message to send: {message}")
manager.get_queue().put(message)
if __name__ == '__main__':
main()
Gebruik
Server:
$ python3 multiprocessing-queue-manager-server.py N
N
is een geheel getal dat aangeeft hoeveel servers moeten worden gemaakt. Kopieer een van de <server-address-N>
-uitvoer van de server en maak dit het eerste argument van elke multiprocessing-queue-manager-client.py
.
Klant:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Resultaat
Server:
Client 1: <item> from <server-address-1>
Samenvatting: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD: hiereen pakket gemaakt.
Server:
import ipcq
with ipcq.QueueManagerServer(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO) as server:
server.get_queue().get()
Klant:
import ipcq
client = ipcq.QueueManagerClient(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO)
client.get_queue().put('a message')
Antwoord 6
We hebben twee versies hiervan geïmplementeerd, één een eenvoudige multi threadpool die vele soorten callables kan uitvoeren, wat ons leven veel gemakkelijker maakt, en de tweede versie die gebruik maakt van processen, wat minder flexibel is qua callables en extra call to dille vereist.
Als frozen_pool op true wordt ingesteld, wordt de uitvoering bevriezen totdat finish_pool_queue in een van beide klassen wordt aangeroepen.
Draadversie:
'''
Created on Nov 4, 2019
@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
class ThreadPool(object):
def __init__(self, queue_threads, *args, **kwargs):
self.frozen_pool = kwargs.get('frozen_pool', False)
self.print_queue = kwargs.get('print_queue', True)
self.pool_results = []
self.lock = Lock()
self.queue_threads = queue_threads
self.queue = Queue()
self.threads = []
for i in range(self.queue_threads):
t = Thread(target=self.make_pool_call)
t.daemon = True
t.start()
self.threads.append(t)
def make_pool_call(self):
while True:
if self.frozen_pool:
#print '--> Queue is frozen'
sleep(1)
continue
item = self.queue.get()
if item is None:
break
call = item.get('call', None)
args = item.get('args', [])
kwargs = item.get('kwargs', {})
keep_results = item.get('keep_results', False)
try:
result = call(*args, **kwargs)
if keep_results:
self.lock.acquire()
self.pool_results.append((item, result))
self.lock.release()
except Exception as e:
self.lock.acquire()
print e
traceback.print_exc()
self.lock.release()
os.kill(os.getpid(), signal.SIGUSR1)
self.queue.task_done()
def finish_pool_queue(self):
self.frozen_pool = False
while self.queue.unfinished_tasks > 0:
if self.print_queue:
print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
sleep(5)
self.queue.join()
for i in range(self.queue_threads):
self.queue.put(None)
for t in self.threads:
t.join()
del self.threads[:]
def get_pool_results(self):
return self.pool_results
def clear_pool_results(self):
del self.pool_results[:]
Procesversie:
'''
Created on Nov 4, 2019
@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc
class ProcessPool(object):
def __init__(self, queue_processes, *args, **kwargs):
self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
self.print_queue = kwargs.get('print_queue', True)
self.manager = Manager()
self.pool_results = self.manager.list()
self.queue_processes = queue_processes
self.queue = JoinableQueue()
self.processes = []
for i in range(self.queue_processes):
p = Process(target=self.make_pool_call)
p.start()
self.processes.append(p)
print 'Processes', self.queue_processes
def make_pool_call(self):
while True:
if self.frozen_pool.value:
sleep(1)
continue
item_pickled = self.queue.get()
if item_pickled is None:
#print '--> Ending'
self.queue.task_done()
break
item = dill.loads(item_pickled)
call = item.get('call', None)
args = item.get('args', [])
kwargs = item.get('kwargs', {})
keep_results = item.get('keep_results', False)
try:
result = call(*args, **kwargs)
if keep_results:
self.pool_results.append(dill.dumps((item, result)))
else:
del call, args, kwargs, keep_results, item, result
except Exception as e:
print e
traceback.print_exc()
os.kill(os.getpid(), signal.SIGUSR1)
self.queue.task_done()
def finish_pool_queue(self, callable=None):
self.frozen_pool.value = False
while self.queue._unfinished_tasks.get_value() > 0:
if self.print_queue:
print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))
if callable:
callable()
sleep(5)
for i in range(self.queue_processes):
self.queue.put(None)
self.queue.join()
self.queue.close()
for p in self.processes:
with ignore_exception: p.join(10)
with ignore_exception: p.terminate()
with ignore_exception: del self.processes[:]
def get_pool_results(self):
return self.pool_results
def clear_pool_results(self):
del self.pool_results[:]
def test(eg): print 'EG', eg
Bel met een van beide:
tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()
of
pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()
Antwoord 7
Een voorbeeld van meerdere producenten en meerdere consumenten, geverifieerd. Het zou gemakkelijk moeten zijn om het aan te passen voor andere gevallen, single/multi-producenten, single/multi-consumenten.
from multiprocessing import Process, JoinableQueue
import time
import os
q = JoinableQueue()
def producer():
for item in range(30):
time.sleep(2)
q.put(item)
pid = os.getpid()
print(f'producer {pid} done')
def worker():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
q.task_done()
for i in range(5):
p = Process(target=worker, daemon=True).start()
# send thirty task requests to the worker
producers = []
for i in range(2):
p = Process(target=producer)
producers.append(p)
p.start()
# make sure producers done
for p in producers:
p.join()
# block until all workers are done
q.join()
print('All work completed')
Uitleg:
- Twee producenten en vijf consumenten in dit voorbeeld.
- JoinableQueue wordt gebruikt om ervoor te zorgen dat alle elementen die in de wachtrij zijn opgeslagen, worden verwerkt. ‘task_done’ is voor de werknemer om te melden dat een element is voltooid. ‘q.join()’ wacht op alle elementen die als gereed zijn gemarkeerd.
- Met #2 is het niet nodig om op elke werknemer te wachten.
- Maar het is belangrijk om mee te doen en te wachten tot elke producent het element in de wachtrij opslaat. Anders sluit u het programma onmiddellijk af.