In the below code example the intention was to block the event from second$
by emitting 1 to Subject mid$
.
import { Subject, timer } from "rxjs";
import { switchMap, takeUntil, tap } from "rxjs/operators";
const first$ = timer(1000);
const second$ = timer(2000);
const mid$ = new Subject();
first$.pipe(
tap(() => {
mid$.next(1);
}),
switchMap(() => second$.pipe(
takeUntil(mid$),
tap(() => console.log("MISSED!"))
)),
).subscribe();
mid$.subscribe(() => console.log("RECEIVED"));
But it doesn’t work for some reason as the console shows:
RECEIVED
MISSED!
I.e. the event emission in line mid$.next(1);
isn’t taken into account by takeUntil(mid$)
What is the logic here?
I noticed that if I replace line mid$.next(1);
with timer(0).subscribe(() => mid$.next(1));
it works as expected, but I’d like to know what is the proper way to handle such cases in RxJS.
It does not work as intended like this
because when
mid$.next(1);
is reached, theswitchMap
‘s inner observable has not been created yet. So,takeUntil
didn’t subscribe to thatmid$
subject yet.It works with
timer(0).subscribe(() => mid$.next(1));
(which is roughly the same assetTimeout(() => mid$.next() , 0)
), because, in this case, whenmid$
emits,switchMap
has already created the inner observable.A quick way to solve this might involve using a
BehaviorSubject
instead of aSubject
, because theBehaviorSubject
will send the last emitted value to new subscribers:takeUntil unsubscribes only the next time an observable emits. It does not know whether an observable has emitted previously.
This will log
This is what’s going on:
One second passes. first$ emits and you switchMap to second$. Another second passes and second$ emits. Only now you tell it to unsubscribe after mid$ emits the next time. However mid$ has already emitted (and never emits again). If you replace second$ with interval(2000) you will see what I mean. "MISSED!" will be logged forever every 2s.