RxJS: debounce a stream only if distinct

I want to debounce a stream – but only if the source value is the same as before. How would I do this with RxJS 5?

I do not want to emit a value if the value is the same and I emitted it previously within a specified time window. I should be able to use the value from the stream – or compare function similar to distinctUntilChanged.

11 thoughts on “RxJS: debounce a stream only if distinct”

  1. This rxjs6+ operator will emit when the source ‘value’ has changed or when some ‘delay’ time has passed since last emit (even if ‘value’ has not changed):

    export function throttleUntilChanged(delay: number) {
      return (source: Observable<any>) => {
        return new Observable(observer => {
    
          let lastSeen = {};
          let lastSeenTime = 0;
    
          return source
            .pipe(
              flatMap((value: any) => {
                const now = Date.now();
                if (value === lastSeen && (now - lastSeenTime) < delay ) {
                  return empty();
                } else {
                  lastSeen = value;
                  lastSeenTime = now;
                  return of(value);
                }
              })
            )
            .subscribe(observer);
        });
      };
    }
    
    Reply
  2. It depends on what you’re trying to do; I came upon this question when I was trying to do something similar, basically debouncing but with different debounces for different values of an object.

    After trying the solution from jayphelps I couldn’t get it to behave as I wanted. After much back and forth, turns out there is an in built easy way to do it: groupby.

    const priceUpdates = [
      {bid: 10, id: 25},
      {bid: 20, id: 30},
      {bid: 11, id: 25},
      {bid: 21, id: 30},
      {bid: 25, id: 30}
    ];//emit each person
    const source = Rx.Observable.from(priceUpdates);
    //group by age
    const example = source
      .groupBy(bid => bid.id)
      .mergeMap(group$ => group$.debounceTime(500))
    
    const subscribe = example.subscribe(val => console.log(val));
    

    Output:

    [object Object] {
      bid: 11,
      id: 25
    }
    [object Object] {
      bid: 25,
      id: 30
    }
    

    Jsbin: http://jsbin.com/savahivege/edit?js,console

    This code will group by the bid ID and debounce on that, so therefore only send the last values for each.

    Reply
  3. update for rxjs 6 :

     source$
    .pipe(
            // debounceTime(300),  optionally un-comment this to add debounce
            distinctUntilChanged(),
        )
    .subscribe(v => console.log(v))
    
    Reply
  4. I’m not aware of any way to do this with without creating your own operator because you need to maintain some sort of state (the last seen value).

    One way looks something like this:

    // I named this debounceDistinctUntilChanged but that might not be
    // the best name. Name it whatever you think makes sense!
    
    function debounceDistinctUntilChanged(delay) {
      const source$ = this;
    
      return new Observable(observer => {
        // Using an object as the default value
        // so that the first time we check it
        // if its the same its guaranteed to be false
        // because every object has a different identity.
        // Can't use null or undefined because source may
        // emit these!
        let lastSeen = {};
    
        return source$
          .debounce(value => {
            // If the last value has the same identity we'll
            // actually debounce
            if (value === lastSeen) {
              return Observable.timer(delay);
            } else {
              lastSeen = value;
              // This will complete() right away so we don't actually debounce/buffer
              // it at all
              return Observable.empty();
            }
          })
          .subscribe(observer);
      });
    }
    

    Now that you see an implementation you may (or may not) find it differs from your expectations. Your description actually left out certain details, like if it should only be the last value you keep during the debounce time frame or if it’s a set–basically distinctUntilChanged vs. distinct. I assumed the later.

    Either way hopefully this gives you a starting point and reveals how easy it is to create custom operators. The built in operators definitely do not provide solutions for everything as-is, so any sufficiently advanced app will need to make their own (or do the imperative stuff inline without abstracting it, which is fine too).

    You can then use this operator by putting it on the Observable prototype:

    Observable.prototype.debounceDistinctUntilChanged = debounceDistinctUntilChanged;
    
    // later
    source$
      .debounceDistinctUntilChanged(400)
      .subscribe(d => console.log(d));
    

    Or by using let:

    // later
    source$
      .let(source$ => debounceDistinctUntilChanged.call($source, 400))
      .subscribe(d => console.log(d));
    

    If you can, I recommend truly understanding what my code does, so that in the future you are able to easily make your own solutions.

    Reply
  5. Here is my RXJS 6+ version in typescript that works 100% as originally requested. Debounce (restart timer) on every new source value. Emit value only if the new value is different from the previous value or the debounce time has expired.

    // custom rxjs operator to debounce while the source emits the same values
    debounceDistinct<T>(delay: number) {
        return (source: Observable<T>): Observable<T> => {
            return new Observable(subscriber => {
                let hasValue = false;
                let lastValue: T | null = null;
                let durationSub: Subscription = null;
    
                const emit = () => {
                    durationSub?.unsubscribe();
                    durationSub = null;
                    if (hasValue) {
                        // We have a value! Free up memory first, then emit the value.
                        hasValue = false;
                        const value = lastValue!;
                        lastValue = null;
                        subscriber.next(value);
                    }
                };
    
                return source.subscribe(
                    (value: T) => {
                        // new value received cancel timer
                        durationSub?.unsubscribe();
                        // emit lastValue if the value has changed
                        if (hasValue && value !== lastValue) {
                            const value = lastValue!;
                            subscriber.next(value);
                        }
                        hasValue = true;
                        lastValue = value;
                        // restart timer
                        durationSub = timer(delay).subscribe(() => {
                            emit();
                        });
                    },
                    error => {
                    },
                    () => {
                        emit();
                        subscriber.complete();
                        lastValue = null;
                    });
            });
        }
    }
    
    Reply
  6. Providing an answer for RxJS 6+ with the method suggested by @samberic in an ngrx effect to group actions coming from a same source id with RxJS 6.

    this.actions$.pipe(
        ofType(actionFoo, actionBar), // Two different ngrx action with an id property
        groupBy(action => action.id), // Group by the id from the source
        mergeMap(action => action.pipe(
            debounceTime(5000)
        ))
    ).pipe(
        // Do whatever it is that your effect is supposed to do!
    )
    
    Reply

Leave a Comment