ThreadPoolExecutor blokkeren wanneer wachtrij vol is?

Ik probeer veel taken uit te voeren met behulp van een ThreadPoolExecutor. Hieronder is een hypothetisch voorbeeld:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

Het probleem is dat ik snel een java.util.concurrent.RejectedExecutionExceptionkrijg, aangezien het aantal taken groter is dan de werkwachtrij. Het gewenste gedrag waarnaar ik op zoek ben, is echter om het hoofdthreadblok te hebben totdat er ruimte in de wachtrij is. Wat is de beste manier om dit te bereiken?


Antwoord 1, autoriteit 100%

In sommige zeer beperkte omstandigheden kunt u een java.util.concurrent.RejectedExecutionHandler implementeren die doet wat u nodig hebt.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};
ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Nu. Dit is een heel slecht ideeom de volgende redenen

  • Het is vatbaar voor een impasse omdat alle threads in de pool kunnen sterven voordat het ding dat je in de wachtrij plaatst zichtbaar is. Beperk dit door een redelijke keep-alive-tijd in te stellen.
  • De taak is niet ingepakt zoals uw uitvoerder mag verwachten. Veel uitvoerende implementaties wikkelen hun taken in een soort volgobject voordat ze worden uitgevoerd. Kijk naar de bron van jou.
  • Toevoegen via getQueue() wordt sterk afgeraden door de API en kan op een bepaald moment worden verboden.

Een bijna altijd betere strategie is om ThreadPoolExecutor.CallerRunsPolicy te installeren, waardoor uw app wordt afgeremd door de taak uit te voeren op de thread die execute() aanroept.

Soms is een blokkeringsstrategie, met alle bijbehorende risico’s, echter echt wat u wilt. Ik zou zeggen onder deze voorwaarden

  • Je hebt maar één thread die execute() aanroept
  • U moet (of wilt) een zeer korte wachtrij hebben
  • Je moet absoluut het aantal threads dat dit werk uitvoert, beperken (meestal om externe redenen), en een strategie voor het uitvoeren van een beller zou dat doorbreken.
  • Uw taken zijn van onvoorspelbare grootte, dus caller-runs kunnen leiden tot hongersnood als de pool even bezig was met 4 korte taken en uw ene thread-calling-executie vast kwam te zitten met een grote.

Dus, zoals ik al zei. Het is zelden nodig en kan gevaarlijk zijn, maar daar ga je.

Veel succes.


Antwoord 2, autoriteit 23%

Wat u moet doen, is uw ThreadPoolExecutor in Executor inpakken, wat expliciet het aantal gelijktijdig uitgevoerde bewerkingen erin beperkt:

private static class BlockingExecutor implements Executor {
    final Semaphore semaphore;
    final Executor delegate;
    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }
    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };
        delegate.execute(wrapped);
    }
}

U kunt concurrentTasksLimit aanpassen aan de threadPoolSize + queueSize van uw gedelegeerde uitvoerder en het zal uw probleem vrijwel oplossen


Antwoord 3, autoriteit 11%

Je zou een semaphorekunnen gebruiken om te voorkomen dat threads in de pool terechtkomen.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);
Semaphore lock = new Semaphore(6); // equal to queue capacity
for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Enkele valkuilen:

  • Gebruik dit patroon alleen met een vaste draadpool. Het is onwaarschijnlijk dat de wachtrij vaak vol is, dus er worden geen nieuwe threads gemaakt. Bekijk de java-documenten op ThreadPoolExecutor voor meer details: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.htmlEr is een manier om dit te omzeilen, maar het valt buiten het bestek van dit antwoord.
  • De wachtrijgrootte moet groter zijn dan het aantal kernthreads. Als we wachtrijgrootte 3 zouden maken, zou er uiteindelijk het volgende gebeuren:

    • T0: alle drie de threads doen hun werk, de wachtrij is leeg, er zijn geen vergunningen beschikbaar.
    • T1: Onderwerp 1 is afgelopen, geeft een vergunning vrij.
    • T2: Thread 1 doorzoekt de wachtrij voor nieuw werk, vindt er geen en wacht.
    • T3: Hoofdthread levert werk in de pool, thread 1 begint te werken.

    Het bovenstaande voorbeeld vertaalt zich in de hoofdthread blokkerendethread 1. Het lijkt misschien een korte periode, maar vermenigvuldig nu de frequentie met dagen en maanden. Plotseling vormen korte perioden een grote hoeveelheid tijdverspilling.


Antwoord 4, autoriteit 8%

Dit is wat ik uiteindelijk deed:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

Antwoord 5

Hier is mijn codefragment in dit geval:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool is een lokale instantie van java.util.concurrent.ExecutorService die al is geïnitialiseerd.


Antwoord 6

Een redelijk eenvoudige optie is om uw BlockingQueuein te pakken met een implementatie die put(..)aanroept wanneer offer(..)is wordt aangeroepen:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {
(..)
  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }
(.. implement all other methods simply by delegating ..)
}

Dit werkt omdat put(..)standaard wacht tot er capaciteit in de wachtrij is wanneer deze vol is, zie:

   /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

Geen opsporing van RejectedExecutionExceptionof ingewikkelde vergrendeling nodig.


Antwoord 7

Ik heb dit probleem opgelost met een aangepaste RejectedExecutionHandler, die de aanroepende thread gewoon een tijdje blokkeert en vervolgens de taak opnieuw probeert in te dienen:

public class BlockWhenQueueFull implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}
        executor.execute(r);
    }
}

Deze klasse kan gewoon worden gebruikt in de thread-pool executor als een RejectedExecutionHandler zoals elke andere. In dit voorbeeld:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

Het enige nadeel dat ik zie, is dat de aanroepende thread iets langer kan worden vergrendeld dan strikt noodzakelijk (tot 250 ms). Voor veel kortlopende taken kunt u de wachttijd misschien verkorten tot 10 ms of zo. Bovendien, aangezien deze uitvoerder in feite recursief wordt aangeroepen, kunnen zeer lange wachttijden tot een thread beschikbaar komt (uren) resulteren in een stack-overflow.

Toch vind ik deze methode persoonlijk prettig. Het is compact, gemakkelijk te begrijpen en werkt goed. Mis ik iets belangrijks?


Antwoord 8

Ok, oude thread, maar dit is wat ik vond bij het zoeken naar blocking thread-executor. Mijn code probeert een semafoor te krijgen wanneer de taak wordt ingediend bij de taakwachtrij. Dit blokkeert als er geen semaforen meer zijn. Zodra een taak is voltooid, wordt de semafoor vrijgegeven met de decorateur. Het enge deel is dat er een mogelijkheid is om semafoor te verliezen, maar dat kan worden opgelost met bijvoorbeeld een getimede taak die alleen semaforen op een getimede basis wist.

Dus hier mijn oplossing:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
    companion object {
        lateinit var semaphore: Semaphore
    }
    init {
        semaphore = Semaphore(concurrency)
        val semaphoreTaskDecorator = SemaphoreTaskDecorator()
        this.setTaskDecorator(semaphoreTaskDecorator)
    }
    override fun <T> submit(task: Callable<T>): Future<T> {
        log.debug("submit")
        semaphore.acquire()
        return super.submit(task)
    }
}
private class SemaphoreTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        log.debug("decorate")
        return Runnable {
            try {
                runnable.run()
            } finally {
                log.debug("decorate done")
                semaphore.release()
            }
        }
    }
}

Other episodes