Threading-pool vergelijkbaar met de multiprocessing-pool?

Is er een poolklasse voor worker-threads, vergelijkbaar met de Poolklasse?

Ik hou bijvoorbeeld van de gemakkelijke manier om een kaartfunctie te parallelliseren

def long_running_func(p):
    c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

ik zou het echter willen doen zonder de overhead van het maken van nieuwe processen.

Ik weet van de GIL. In mijn usecase zal de functie echter een IO-gebonden C-functie zijn waarvoor de python-wrapper de GIL vrijgeeft vóór de daadwerkelijke functieaanroep.

Moet ik mijn eigen threading-pool schrijven?


Antwoord 1, autoriteit 100%

Ik kwam er net achter dat er in feite eenop threads gebaseerde Pool-interface is in de multiprocessing-module, maar deze is enigszins verborgen en niet goed gedocumenteerd.

Het kan worden geïmporteerd via

from multiprocessing.pool import ThreadPool

Het wordt geïmplementeerd met behulp van een dummy Process-klasse die een python-thread omhult. Deze op threads gebaseerde Process-klasse is te vinden in multiprocessing.dummydie kort wordt genoemd in de docs. Deze dummy-module biedt zogenaamd de hele multiprocessing-interface op basis van threads.


Antwoord 2, autoriteit 51%

In Python 3 kun je concurrent.futures.ThreadPoolExecutor, dat wil zeggen:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Zie de docsvoor meer informatie en voorbeelden.


Antwoord 3, autoriteit 14%

Ja, en het lijkt (min of meer) dezelfde API te hebben.

import multiprocessing
def worker(lnk):
    ....    
def start_process():
    .....
....
if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)
pool.map(worker, inputs)
....

Antwoord 4, autoriteit 9%

Voor iets heel eenvoudigs en lichts (enigszins aangepast van hier):

from Queue import Queue
from threading import Thread
class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()
    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()
class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)
    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))
    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()
if __name__ == '__main__':
    from random import randrange
    from time import sleep
    delays = [randrange(1, 10) for i in range(100)]
    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)
    pool = ThreadPool(20)
    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)
    pool.wait_completion()

Om callbacks te ondersteunen op de afronding van de taak, kunt u gewoon de terugbellen toevoegen aan de taaktuple.


Antwoord 5, Autoriteit 5%

Hallo om het draadpool in Python te gebruiken, kunt u deze bibliotheek gebruiken:

from multiprocessing.dummy import Pool as ThreadPool

En dan voor gebruik, deze bibliotheek doet dat:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

De threads zijn het aantal threads dat u wilt en taken zijn een lijst met taak die de meeste kaart bij de service is.


Antwoord 6, Autoriteit 2%

Hier is het resultaat dat ik eindelijk heb gebruikt. Het is een gewijzigde versie van de klassen van DGORISSEN hierboven.

Bestand: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()
    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return
    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()
class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)
    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))
    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))
    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []
    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()
    def __del__(self):
        self._close_all_threads()
def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

Om het zwembad

te gebruiken

from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

Antwoord 7

Ja, er is een draadkoolvormig zwembad vergelijkbaar met het multiprocessing pool, maar het is enigszins verborgen en niet goed gedocumenteerd. U kunt het op de volgende manier importeren: –

from multiprocessing.pool import ThreadPool

Ik zie je eenvoudig voorbeeld

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100
        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]
        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]
        for result in results:
            tm.assert_frame_equal(first_result, result) 

Antwoord 8

De overhead van het creëren van de nieuwe processen is minimaal, vooral wanneer het slechts 4 van hen is. Ik betwijfel of dit een prestatie-hotspot van uw toepassing is. Houd het eenvoudig, geoptimaliseerd waar je moet en waar profileerresultaten naartoe wijzen.


Antwoord 9

een andere manier kan zijn om het proces toe te voegen aan de threadwachtrijpool

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

Antwoord 10

Er is geen ingebouwde op threads gebaseerde pool. Het kan echter heel snel zijn om een producent/consumentenwachtrij te implementeren met de klasse Queue.

Van:
https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()
q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()
for item in source():
    q.put(item)
q.join()       # block until all tasks are done

Antwoord 11

Als je het niet erg vindt om de code van anderen uit te voeren, hier is de mijne:

Opmerking:er is veel extra code die u misschien wilt verwijderen [toegevoegd voor een betere verduidelijking en demonstratie van hoe het werkt]

Opmerking:Python-naamgevingsconventies werden gebruikt voor namen van methoden en variabelen in plaats van camelCase.

Werkprocedure:

  1. MultiThread-klasse wordt gestart zonder threads door het delen van lock, werkwachtrij, exit-vlag en resultaten.
  2. SingleThread wordt gestart door MultiThread zodra alle instanties zijn gemaakt.
  3. We kunnen werken toevoegen met MultiThread (het zorgt voor vergrendeling).
  4. SingleThreads verwerkt de werkwachtrij met een slot in het midden.
  5. Zodra je werk gedaan is, kun je alle threads met gedeelde booleaanse waarde vernietigen.
  6. Hier kan werk van alles zijn. Het kan automatisch de module importeren (verwijder de commentaarregel import) en verwerken met gegeven argumenten.
  7. Resultaten worden toegevoegd aan resultaten en we kunnen get_results gebruiken

Code:

import threading
import queue
class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results
    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)
class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)
    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()
    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()
    def get_results(self):
        return self.results
class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs
class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])
    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)
class bool_instance:
    def __init__(self, value=False):
        self.value = value
    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))
    def __bool__(self):
        return self.value
if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())

Other episodes