Be the first user to complete this post

  • 0
Add to List

Rxjs Observable publish refcount vs share

The two common ways to share data coming from an observable is via the publish().refCount() and share() methods. At first glance, they might seem similar but there one difference between the two techniques that may significantly impact your decision to choose one of them.

With observable.publish().refCount(), once the observer completes, it will not restart if a new subscriber is added after completion. Whereas with observable.share(), if the underlying observer completes and a new subscriber is added later, the observer effectively begins a new execution and starts emitting data.
Lets see some examples to understand what that actually means. There are 2 scenarios to consider when dealing with shared observables.

Scenario 1

Late subscription after underlying observer completes, but still has active subscribers
// Create a source that emits random values until 300 milliseconds
var source$ = Rx.Observable.interval(100).take(3).map(index => `${index}-${Math.floor(50 + Math.random() * 100)}` );

// Create a refcounted observable
var refcounted$ = source$.publish().refCount();

refcounted$.subscribe(x => console.log('A' + x));

// Simulate a late subscriber
setTimeout(() => {
  // By the time B subscribes, the source$ has already completed
  refcounted$.subscribe(x => console.log('B' + x));
}, 400);

//---- Output ----
A0-124
A1-123
A2-51
// Create a source that emits random values until 300 milliseconds
var source$ = Rx.Observable.interval(100).take(3).map(index => `${index}-${Math.floor(50 + Math.random() * 100)}` );

// Create a shared stream
var shared$ = source$.share();

shared$.subscribe(x => console.log('A' + x));

// Simulate a subscriber that arrives after a delay
setTimeout(() => {
  // By this time the underlying source has completed but
  // since it is shared, it will restart execution when this
  // subscriber arrives
  shared$.subscribe(x => console.log('B' + x));
}, 500);

// ---- Output ----
A0-122
A1-100
A2-89
B0-64
B1-51
B2-112
As you can see from the examples, B receives a whole new set of values. The difference is subtle, yet important. The reason for this difference is that observable.publish().refCount() uses only one underlying subject. So if the source observer completes, future subscribers will receive no data because a completed subject cannot restart. Whereas observable.share() uses a subject factory. So if the underlying observable completes, a new subject is created which resubscribes to the underlying source, therefore future subscribers will receive data via a new execution of the source.

Scenario 2

Late subscription after underlying observer completes, but has no active subscribers
// Create a source that emits random values until 300 milliseconds
var source$ = Rx.Observable.interval(100).take(3).map(index => `${index}-${Math.floor(50 + Math.random() * 100)}` );

// Create a refcounted stream
var refcounted$ = source$.publish().refCount();

var subscription = refcounted$.subscribe(x => console.log('A' + x));
setTimeout(() => {
  // This effectively makes the subscriber count to 0.
  subscription.unsubscribe();
}, 150);

// Simulate a subscriber that arrives after a delay
setTimeout(() => {
  // The underlying source had a subscriber count of 0 at this time
  // but it was still not complete when B subscribed
  refcounted$.subscribe(x => console.log('B' + x));
}, 250);

// ---- Output ----
A0-120
B0-65
B1-143
B2-63

Notice that in the above case, even though the subscriber B joined late, only a few milliseconds before the source could complete, the source was re-executed and B received a whole new set of random values. The same holds happens even for shared() observables.

Conclusion

  • A refcounted observable will not trigger source execution once completed for late subscribers.
  • A shared observable will re-execute the source after completion for a late subscriber.
  • Both shared or refcounted observables re-execute the source if the subscriber count reaches 0 but a new subscriber arrives before the source gets a chance to complete.

References

http://reactivex.io/documentation/operators/publish.html https://github.com/ReactiveX/RxJS/issues/1363 https://github.com/ReactiveX/rxjs/issues/453#issuecomment-153806139



Also Read:

  1. Getting started with Rxjs and streams
  2. Difference between Rxjs Subject and Observable
  3. The difference between Rxjs combineLatest and withLatestFrom
  4. Understanding the Rx multicast operator