Thread pooling in C++11

Relevante vragen:

Over C++11:

Over Boost:


Hoe krijg ik een pool van discussielijnenom taken naar te sturen, zonder ze steeds opnieuw te maken en te verwijderen? Dit betekent dat permanente threads opnieuw moeten worden gesynchroniseerd zonder lid te worden.


Ik heb code die er als volgt uitziet:

namespace {
  std::vector<std::thread> workers;
  int total = 4;
  int arr[4] = {0};
  void each_thread_does(int i) {
    arr[i] += 2;
  }
}
int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

In plaats van elke iteratie threads te maken en eraan deel te nemen, zou ik er de voorkeur aan geven taken elke iteratie naar mijn werkthreads te sturen en ze slechts één keer te maken.


Antwoord 1, autoriteit 100%

Dit is gekopieerd van mijn antwoord naar een ander zeer vergelijkbaar bericht, ik hoop dat het kan helpen:

  1. Begin met het maximale aantal threads dat een systeem kan ondersteunen:

    int Num_Threads = thread::hardware_concurrency();

  2. Voor een efficiënte threadpool-implementatie is het beter om geen nieuwe te maken of oude te vernietigen (door lid te worden), zodra threads zijn gemaakt volgens Num_Threads. Er zal een prestatieverlies zijn, het kan zelfs zijn dat uw applicatie langzamer gaat dan de seriële versie.

Elke C++11-thread zou in zijn functie moeten draaien met een oneindige lus, constant wachtend op nieuwe taken om te grijpen en uit te voeren.

Hier leest u hoe u een dergelijke functie aan de threadpool kunt koppelen:

int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{  Pool.push_back(thread(Infinite_loop_function));}
  1. De oneindige_loop_functie

Dit is een “while(true)”-lus die wacht op de taakwachtrij

void The_Pool:: Infinite_loop_function()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);
            condition.wait(lock, [this](){return !Queue.empty() || terminate_pool;});
            Job = Queue.front();
            Queue.pop();
        }
        Job(); // function<void()> type
    }
};
  1. Maak een functie om een taak aan uw wachtrij toe te voegen
void The_Pool:: Add_Job(function<void()> New_Job)
{
    {
         unique_lock<mutex> lock(Queue_Mutex);
         Queue.push(New_Job);
    }
        condition.notify_one();
}
  1. Een willekeurige functie aan uw wachtrij binden
   Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Als je deze ingrediënten eenmaal hebt geïntegreerd, heb je je eigen dynamische threading-pool. Deze threads lopen altijd, wachtend op de taak om te doen.

Mijn excuses als er syntaxisfouten zijn, ik heb deze code getypt en ik heb een slecht geheugen. Sorry dat ik je niet de volledige threadpoolcode kan geven, dat zou mijn werkintegriteit schenden.

Bewerken: om de pool te beëindigen, roept u de methode shutdown() aan:

XXXX::shutdown()
{
    {
        unique_lock<mutex> lock(threadpool_mutex);
        terminate_pool = true; // use this flag in condition.wait
    }
    condition.notify_all(); // wake up all threads.
    // Join all threads.
    for(std::thread &every_thread : thread_vector)
    {   every_thread.join();}
    thread_vector.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}

Antwoord 2, autoriteit 73%

Je kunt C++ Thread Pool Library gebruiken, https://github.com/vit-vit/ctpl.

Dan kan de code die je hebt geschreven worden vervangen door het volgende

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

U krijgt het gewenste aantal threads en u zult ze niet steeds opnieuw maken en verwijderen tijdens de iteraties.


Antwoord 3, autoriteit 55%

Een pool van threads betekent dat al uw threads de hele tijd actief zijn – met andere woorden, de thread-functie keert nooit terug. Om de threads iets zinvols te geven, moet u een systeem van communicatie tussen threads ontwerpen, zowel om de thread te vertellen dat er iets te doen is, als om de feitelijke werkgegevens te communiceren.

Normaal gesproken zal dit een soort gelijktijdige gegevensstructuur met zich meebrengen, en elke thread zou vermoedelijk slapen op een soort conditievariabele, die op de hoogte zou worden gesteld wanneer er werk te doen is. Na ontvangst van de melding worden een of meerdere threads wakker, herstellen een taak uit de gelijktijdige gegevensstructuur, verwerken deze en slaan het resultaat op een analoge manier op.

De discussie gaat dan verder om te controleren of er nog meer werk te doen is, en zo niet, ga dan weer slapen.

Het resultaat is dat je dit allemaal zelf moet ontwerpen, aangezien er geen natuurlijk begrip van ‘werk’ is dat universeel toepasbaar is. Het is nogal wat werk en er zijn enkele subtiele problemen die je moet oplossen. (Je kunt in Go programmeren als je van een systeem houdt dat achter de schermen het threadbeheer voor je regelt.)


Antwoord 4, autoriteit 16%

Een threadpool is in wezen een set threads die allemaal zijn gekoppeld aan een functie die werkt als een gebeurtenislus. Deze threads zullen eindeloos wachten op het uitvoeren van een taak of op hun eigen beëindiging.

De taak van de threadpool is om een interface te bieden om taken in te dienen, het beleid voor het uitvoeren van deze taken te definiëren (en misschien te wijzigen) (planningsregels, thread-instantiatie, grootte van de pool) en de status van de threads en gerelateerde bronnen te bewaken .

Dus voor een veelzijdige pool moet men beginnen met te definiëren wat een taak is, hoe deze wordt gelanceerd, onderbroken, wat het resultaat is (zie het begrip belofte en toekomst voor die vraag), wat voor soort gebeurtenissen de threads zullen waarop ze moeten reageren, hoe ze ermee zullen omgaan, hoe deze gebeurtenissen zullen worden onderscheiden van de gebeurtenissen die door de taken worden afgehandeld. Dit kan behoorlijk ingewikkeld worden, zoals je kunt zien, en beperkingen opleggen aan hoe de threads zullen werken, naarmate de oplossing steeds meer betrokken raakt.

De huidige tooling voor het afhandelen van gebeurtenissen is vrij barebones(*): primitieven zoals mutexen, conditievariabelen en een paar abstracties daarbovenop (sloten, barrières). Maar in sommige gevallen kunnen deze onttrekkingen ongeschikt blijken te zijn (zie deze gerelateerde vraag ), en men moet terugkeren naar het gebruik van de primitieven.

Andere problemen moeten ook worden opgelost:

  • signaal
  • i/o
  • hardware (processoraffiniteit, heterogene setup)

Hoe zouden deze uitpakken in jouw omgeving?

Dit antwoordop een vergelijkbare vraag wijst op een bestaande implementatie bedoeld voor boost en de stl.

Ik heb een zeer grove implementatievan een threadpool aangeboden voor een andere vraag, die niet veel problemen behandelt die hierboven zijn beschreven. Misschien wil je daarop voortbouwen. Misschien wil je ook eens kijken naar bestaande frameworks in andere talen om inspiratie op te doen.


(*) Ik zie dat niet als een probleem, integendeel. Ik denk dat het de geest van C++ is, geërfd van C.


Antwoord 5, autoriteit 8%

Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;
public:
    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}
void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
    std::cout << "bla" << std::endl;
}
int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }
    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}

Antwoord 6, autoriteit 5%

Je kunt thread_poolgebruiken van boost-bibliotheek:

void my_task(){...}
int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);
    // Submit a function to the pool.
    boost::asio::post(pool, my_task);
    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

Je kunt ook threadpoolvan de open source-community gebruiken:

void first_task() {...}    
void second_task() {...}
int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);
    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}

Antwoord 7, autoriteit 2%

Zoiets zou kunnen helpen (overgenomen van een werkende app).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }
  template<class F>
  void enqueue(F f) {
    service.post(f);
  }
  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }
private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

U kunt het als volgt gebruiken:

thread_pool pool(2);
pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});
pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Houd in gedachten dat het opnieuw uitvinden van een efficiënt asynchrone wachtrijingmechanisme niet triviaal is.

Boost :: ASIO :: IO_Service is een zeer efficiënte implementatie, of is eigenlijk een verzameling platformspecifieke wrappers (bijvoorbeeld het wikkelt I / O-voltooiingspoorten op Windows).


Antwoord 8, Autoriteit 2%

bewerken: dit vereist nu C++ 17 en concepten. (Vanaf 9/12/16 is alleen G ++ 6.0+ voldoende.)

De template-aftrek is echter een stuk nauwkeuriger vanwege het, dus het is de moeite waard om een ​​nieuwere compiler te krijgen. Ik heb nog geen functie gevonden die expliciete sjabloonargumenten vereist.

Het kost nu ook een geschikte vulbare object (en is nog steeds statisch typesafe !!! ).

Het bevat nu ook een optionele groene draadsprijheidsraadpool met behulp van dezelfde API. Deze klasse is echter alleen Posix. Het gebruikt de ucontext_tAPI voor GebruikersPace Task-switching.


Ik heb hiervoor een eenvoudige bibliotheek gemaakt. Een voorbeeld van gebruik wordt hieronder gegeven. (Ik beantwoord dit omdat het een van de dingen was die ik vond voordat ik besloot dat het nodig was om het zelf te schrijven.)

bool is_prime(int n){
  // Determine if n is prime.
}
int main(){
  thread_pool pool(8); // 8 threads
  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }
  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

Je kunt elke functie asyncdoorgeven met elke (of ongeldige) retourwaarde en alle (of geen) argumenten en het zal een overeenkomstige std::futureretourneren. Om het resultaat te krijgen (of gewoon te wachten tot een taak is voltooid), roept u get()in de toekomst aan.

Hier is de github: https://github.com/Tyler-Hardin/thread_pool.


Antwoord 9, autoriteit 2%

Dit is een andere threadpool-implementatie die heel eenvoudig, gemakkelijk te begrijpen en te gebruiken is, alleen de C++11-standaardbibliotheek gebruikt en die kan worden bekeken of aangepast voor uw gebruik, zou een leuke starter moeten zijn als u in het gebruik van threadpools:

https://github.com/progschj/ThreadPool


Antwoord 10

Een threadpool zonder afhankelijkheden buiten STL is heel goed mogelijk. Ik heb onlangs een kleine header-only threadpool-bibliotheekgeschreven om exact hetzelfde probleem aan te pakken. Het ondersteunt het dynamisch wijzigen van de grootte van de pool (het aantal werkers wijzigen tijdens runtime), wachten, stoppen, pauzeren, hervatten enzovoort. Ik hoop dat je het nuttig vindt.

Other episodes