Wacht tot een van Future<T> is klaar

Ik heb weinig asynchrone taken uitgevoerd en ik moet wachten tot ten minste één ervan is voltooid (in de toekomst zal ik waarschijnlijk moeten wachten tot M van de N taken zijn voltooid).
Momenteel worden ze gepresenteerd als Toekomst, dus ik heb iets nodig als

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

Is er zoiets? Of iets dergelijks, niet nodig voor Future. Momenteel loop ik door de verzameling futures, controleer of er een klaar is, slaap dan een tijdje en controleer opnieuw. Dit lijkt niet de beste oplossing, want als ik voor een lange periode slaap, wordt er ongewenste vertraging toegevoegd, als ik voor een korte periode slaap, kan dit de prestaties beïnvloeden.

Ik zou kunnen proberen

. te gebruiken

new CountDownLatch(1)

en verlaag het aftellen wanneer de taak is voltooid en doe

countdown.await()

, maar ik vond het alleen mogelijk als ik Toekomstige creatie beheers. Het is mogelijk, maar vereist een herontwerp van het systeem, omdat de huidige logica van het maken van taken (verzenden van Callable naar ExecutorService) gescheiden is van de beslissing om te wachten op welke toekomst. Ik zou ook kunnen overschrijven

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

en maak een aangepaste implementatie van RunnableFuture met de mogelijkheid om een ​​luisteraar toe te voegen om op de hoogte te worden gesteld wanneer de taak is voltooid, koppel vervolgens een dergelijke luisteraar aan de benodigde taken en gebruik CountDownLatch, maar dat betekent dat ik newTaskFor moet overschrijven voor elke ExecutorService die ik gebruik – en mogelijk daar zal implementatie zijn die AbstractExecutorService niet uitbreidt. Ik zou ook kunnen proberen de ExecutorService voor hetzelfde doel in te pakken, maar dan moet ik alle methoden die Futures produceren, versieren.

Al deze oplossingen werken misschien, maar lijken erg onnatuurlijk. Het lijkt erop dat ik iets simpels mis, zoals

WaitHandle.WaitAny(WaitHandle[] waitHandles)

in c#. Zijn er bekende oplossingen voor dit soort problemen?

UPDATE:

Oorspronkelijk had ik helemaal geen toegang tot Future creation, dus er was geen elegante oplossing. Na het herontwerpen van het systeem kreeg ik toegang tot Future creation en kon ik countDownLatch.countdown() toevoegen aan het uitvoeringsproces, dan kan ik countDownLatch.await() en alles werkt goed.
Bedankt voor andere antwoorden, ik kende ExecutorCompletionService niet en het kan inderdaad nuttig zijn bij vergelijkbare taken, maar in dit specifieke geval kon het niet worden gebruikt omdat sommige Futures worden gemaakt zonder enige uitvoerder – de werkelijke taak wordt via het netwerk naar een andere server verzonden, wordt op afstand voltooid en de voltooiingsmelding wordt ontvangen.


Antwoord 1, autoriteit 100%

eenvoudig, bekijk ExecutorCompletionService.


Antwoord 2, autoriteit 16%

ExecutorService.invokeAny


Antwoord 3, autoriteit 13%

Waarom niet gewoon een resultatenwachtrij maken en in de wachtrij wachten? Of eenvoudiger, gebruik een CompletionService, want dat is het: een ExecutorService + resultatenwachtrij.


Antwoord 4, autoriteit 11%

Dit is eigenlijk vrij eenvoudig met wait() en notifyAll().

Definieer eerst een lock-object. (Je kunt hiervoor elke klasse gebruiken, maar ik ben graag expliciet):

package com.javadude.sample;
public class Lock {}

Definieer vervolgens uw werkthread. Hij moet dat lock-object op de hoogte stellen wanneer hij klaar is met zijn verwerking. Houd er rekening mee dat de melding in een gesynchroniseerde blokkering op het vergrendelde object moet staan.

package com.javadude.sample;
public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Ten slotte kunt u uw klant schrijven:

package com.javadude.sample;
public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);
        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

Antwoord 5, autoriteit 7%

Voor zover ik weet heeft Java geen analoge structuur als de WaitHandle.WaitAnymethode.

Het lijkt mij dat dit kan worden bereikt door een “WaitableFuture”-decorateur:

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;
    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();
        this.countDownLatch = countDownLatch;
    }
    void doTask()
    {
        super.doTask();
        this.countDownLatch.countDown();
    }
}

Hoewel dit alleen zou werken als het vóór de uitvoeringscode kan worden ingevoegd, omdat de uitvoeringscode anders niet de nieuwe methode doTask()zou hebben. Maar ik zie echt geen manier om dit te doen zonder te peilen als je op de een of andere manier geen controle kunt krijgen over het Future-object voordat het wordt uitgevoerd.

Of als de toekomst altijd in zijn eigen thread loopt, en je kunt die thread op de een of andere manier krijgen. Dan zou je een nieuwe thread kunnen spawnen om bij elkaar te voegen, en dan het wachtmechanisme afhandelen nadat de join terugkeert… Dit zou echt lelijk zijn en zou echter veel overhead veroorzaken. En als sommige Future-objecten niet worden voltooid, kunt u veel geblokkeerde threads hebben, afhankelijk van dode threads. Als je niet oppast, kan dit geheugen en systeembronnen lekken.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);
    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }
    countDownLatch.await();
}
class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;
    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }
    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Antwoord 6, Autoriteit 2%

Als u CompletableFutures In plaats daarvan is er CompletableFuture.anyOfdat wel Wat u wilt, vraag me gewoon aan bij het resultaat:

CompletableFuture.anyOf(futures).join()

U kunt CompletableFutureS met executors gebruiken door het CompletableFuture.supplyAsyncof runAsyncMETHODEN.


Antwoord 7

Aangezien het je niet kan schelen welke je eindigt, waarom heb je niet alleen een enkele WAITHANDLE voor alle discussies en wacht daar op? Welke het eerst is voltooid, kan het handvat instellen.


Antwoord 8

Zie deze optie:

public class WaitForAnyRedux {
private static final int POOL_SIZE = 10;
public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {
    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {
            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}
static public void main(String[] args) throws InterruptedException, ExecutionException {
    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }
    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }
    }).start();
    synchronized (integers) {
        integers.wait(3000);
    }
    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}

Other episodes