RxJS: takeUntil ignores Subject event

In the below code example the intention was to block the event from second$ by emitting 1 to Subject mid$.

import { Subject, timer } from "rxjs";
import { switchMap, takeUntil, tap } from "rxjs/operators";

const first$ = timer(1000);
const second$ = timer(2000);

const mid$ = new Subject();

first$.pipe(
  tap(() => { 
    mid$.next(1); 
  }),
  switchMap(() => second$.pipe(
    takeUntil(mid$),
    tap(() => console.log("MISSED!"))
  )),
).subscribe();


mid$.subscribe(() => console.log("RECEIVED"));

Stackblitz

But it doesn’t work for some reason as the console shows:

RECEIVED
MISSED!

I.e. the event emission in line mid$.next(1); isn’t taken into account by takeUntil(mid$)

What is the logic here?

I noticed that if I replace line mid$.next(1); with timer(0).subscribe(() => mid$.next(1)); it works as expected, but I’d like to know what is the proper way to handle such cases in RxJS.

41 thoughts on “RxJS: takeUntil ignores Subject event”

  1. It does not work as intended like this

    const first$ = timer(1000);
    const second$ = timer(2000);
    
    const mid$ = new Subject();
    
    first$.pipe(
      tap(() => { 
        mid$.next(1);
      }),
      switchMap(() => second$.pipe(
        takeUntil(mid$),
        tap(() => console.log("MISSED!"))
      )),
    ).subscribe();
    

    because when mid$.next(1); is reached, the switchMap‘s inner observable has not been created yet. So, takeUntil didn’t subscribe to that mid$ subject yet.

    It works with timer(0).subscribe(() => mid$.next(1));(which is roughly the same as setTimeout(() => mid$.next() , 0)), because, in this case, when mid$ emits, switchMap has already created the inner observable.

    A quick way to solve this might involve using a BehaviorSubject instead of a Subject, because the BehaviorSubject will send the last emitted value to new subscribers:

    const first$ = timer(1000);
    const second$ = timer(2000);
    
    const mid$ = new BehaviorSubject(null);
    
    first$.pipe(
      tap(() => { 
        mid$.next(1);
      }),
      switchMap(() => second$.pipe(
        // here, when `mid$` is subscribed, the subscriber will receive `1` 
        // and the entire inner observable will complete
        takeUntil(mid$),
        tap(() => console.log("MISSED!"))
      )),
    ).subscribe();
    
    Reply
  2. takeUntil unsubscribes only the next time an observable emits. It does not know whether an observable has emitted previously.

    const first$ = timer(1000);
    const second$ = timer(2000);
    
    const mid$ = new Subject();
    
    first$.pipe(
      tap((first) => {
        console.log('first', first)
        mid$.next(1);
        console.log('first', first)
      }),
      switchMap(() => second$.pipe(
        tap((second) => console.log('second', second)),
        takeUntil(mid$), // Unsubscribing next time mid$ emits
        tap(() => console.log("MISSED!"))
      )),
    ).subscribe(second => console.log('final', second));
    
    
    mid$.subscribe(() => console.log("RECEIVED"));
    

    This will log

    // 1s passes
    first 0
    RECEIVED
    first 0
    // 1s passes
    second 0
    MISSED!
    final 0
    

    This is what’s going on:

    One second passes. first$ emits and you switchMap to second$. Another second passes and second$ emits. Only now you tell it to unsubscribe after mid$ emits the next time. However mid$ has already emitted (and never emits again). If you replace second$ with interval(2000) you will see what I mean. "MISSED!" will be logged forever every 2s.

    Reply

Leave a Comment