Hoe kan ik `wachten` op een Rx Observable?

Ik zou graag willen kunnen wachten op een waarneembare, bijv.

const source = Rx.Observable.create(/* ... */)
//...
await source;

Een naïeve poging resulteert erin dat het wachten onmiddellijk wordt opgelost en de uitvoering niet blokkeert

Bewerken:
De pseudocode voor mijn volledige beoogde gebruik is:

if (condition) {
  await observable;
}
// a bunch of other code

Ik begrijp dat ik de andere code naar een andere aparte functie kan verplaatsen en deze kan doorgeven aan de abonneer-callback, maar ik hoop dat te kunnen voorkomen.


Antwoord 1, autoriteit 100%

Je moet een belofte doorgeven om await. Zet de volgende gebeurtenis van het waarneembare om in een belofte en wacht daarop af.

if (condition) {
  await observable.first().toPromise();
}

Opmerking bewerken: dit antwoord gebruikte oorspronkelijk .take(1) maar is gewijzigd in .first() waarmee wordt voorkomen dat de kwestie van de belofte nooit wordt opgelost als de stream eindigt voordat een waarde is bereikt.

Vanaf RxJS v8 wordt toPromiseverwijderd. In plaats daarvan kan het bovenstaande worden vervangen door await firstValueFrom(observable)


Antwoord 2, autoriteit 20%

Gebruik de nieuwe firstValueFrom()of lastValueFrom()in plaats van toPromise(), die, zoals hier aangegeven, wordt afgeraden vanaf RxJS 7, en wordt verwijderd in RxJS 8.

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';
this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

Dit is beschikbaar in RxJS 7+

Zie: https://indepth.dev/rxjs-heads -up-topromise-wordt-deprecated/


Antwoord 3, autoriteit 16%

Het moet waarschijnlijk

await observable.first().toPromise();

Zoals eerder in opmerkingen werd opgemerkt, is er een aanzienlijk verschil tussen de operators take(1)en first()wanneer er een leeg voltooid waarneembaar is.

Observable.empty().first().toPromise()resulteert in afwijzing met EmptyErrordie dienovereenkomstig kan worden afgehandeld, omdat er echt geen waarde was.

En Observable.empty().take(1).toPromise()zal resulteren in een resolutie met een undefinedwaarde.


Antwoord 4, autoriteit 7%

Je moet awaitop een belofte, dus je zult toPromise()willen gebruiken. Zie ditvoor meer details op toPromise().


Antwoord 5, autoriteit 6%

Bewerken:

.toPromise()is nu verouderd in RxJS 7 (bron: https ://rxjs.dev/deprecations/to-promise)

Nieuw antwoord:

Als vervanging voor de verouderde methode toPromise() moet u gebruiken
een van de twee ingebouwde statische conversiefuncties firstValueFrom of
lastValueFrom.

Voorbeeld:

import { interval, lastValueFrom } from 'rxjs';
import { take } from 'rxjs/operators';
async function execute() {
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$);
  console.log(`The final number is ${finalNumber}`);
}
execute();
// Expected output:
// "The final number is 9"

Oud antwoord:

Als toPromisevoor u verouderd is, kunt u .pipe(take(1)).toPromisegebruiken, maar zoals u kunt zien hieris het niet verouderd.

Dus gebruik gewoon toPromise(RxJs 6) zoals gezegd:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

async/wait-voorbeeld:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

Lees hier.


Antwoord 6, autoriteit 2%

Het gebruik van toPromise()wordt niet aanbevolen, aangezien het vanaf RxJs 7 wordt afgeschreven. U kunt twee nieuwe operators gebruiken die aanwezig zijn in RxJs 7 lastValueFrom()en firstValueFrom(). Meer details vindt u hier

const result = await lastValueFrom(myObservable$);

Implementaties in de bètaversie zijn hier beschikbaar:


Antwoord 7

Ik gebruik RxJS V 6.4.0, daarom zou ik een verouderde versie in V 7.x.x toPromise()moeten gebruiken. Geïnspireerd door andere antwoorden, hier is wat ik deed met toPromise()

import { first, ... } from 'rxjs/operators';
...
if (condition) {
  await observable$.pipe(first()).toPromise();
}
...

Merk op hoe ik last()gebruikte in een pipe(). Omdat op de mijne observable.first()niet werkt zoals vermeld door macil

Ik hoop dat dit anderen helpt die RxJS V 6.x.x gebruiken zoals ik :).

Bedankt.

Other episodes