Most of the time you want to do multiple things with data you have. This post is on how to share this data in multiple observables: RxJS multicasting.

For more examples on RxJS multicasting got to this learn-rxjs site.

Say we have an api call that fetches data of a customer. In the template we want to show both name as the age of the customer. Here is some code that might seem ok at first.

ngOnInit() {
   this.customer$ = this.http.get(`api/customers/1`)
   this.name$ = this.customer$.pipe(
      map(customer => customer.name)
   );
   this.age$ = this.customer$.pipe(
      map(customer => customer.age)
   );
}

Subscribing on the name$ and age$ observables will result in 2 network calls to the api. From this we conclude that the http.get()gives us a cold observable. Hot observables always multicast their events over all subscribers. In the case of the example we must do something smarter to share this data in both observables.

Operator: refCount

The solution lies in making the cold observable multicast. We have an operator for this

this.customer$ = this.http.get(`api/customers/1`).pipe(
  multicast(new Subject())
);

...

this.customer$.connect();

But to make this work we have to manually call the connect method.

The refCount operator will take care of this by automatically connect with the first subscriber, keeps all further subscribers in count and disconnect if the count drops to zero.

this.customer$ = this.http.get(`api/customers/1`).pipe(
  multicast(new Subject()),
  refCount()
);

Operators: publish and share

publish() === multicast(new Subject())

This can be substituted in the example above, saves some typing. Often it is used in combination with refCount to take care of the automatic connecting and disconnection.

share() === multicast(() => new Subject()).refCount()

You could think this is exactly publish + refCount but a subtle difference is that share uses a factory function to get its Subject. This has a different behaviour in the case of new subscribers when the source observable has completed.

publish + refCount : All new subscribers will receive completed if source has completed, but if source hasn’t completed Subject will re-subscribe to source

share : A new subscriber (completed or not) will subscribe to source again using new Subject

Let’s expand our example so that we can subscribe to the customer observable (with the update function) after it complets.

ngOnInit() {
   this.customer$ = this.http.get(`api/customers/1`).pipe(
      share()
   )
   this.name$ = this.customer$.pipe(
      map(customer => customer.name)
   );
   this.age$ = this.customer$.pipe(
      map(customer => customer.age)
   );
}

update() {
  this.name$ = this.customer$.pipe(
    map(customer => customer.name + 'update')
  );
  this.age$ = this.customer$.pipe(
    map(customer => customer.age + 'updated')
  );
}

When the update function is called, the factory function of share creates a new Subject and subscribes it to the source observable. So the call will send a new request to the api.

this.customer$ = this.http.get(`api/customers/1`).pipe(
  publish(),
  refCount()
);

In the above example a call of the update function does not create a new Subject. Subscribers will only get a completed event. So no new request is send to the api.

Replay subject

publishReplay() === multicast(new ReplaySubject(n))

We can use different kinds of Subject inside of the multicast. Here ReplaySubject(n)will keep the last n emits into its buffer.

Just as with plain publish we have a counterpart  shareReplay. Here is an example:

this.customer$ = this.http.get(`api/customers/1`).pipe(
   shareReplay({refCount: true, bufferSize: 1})
);

Moreover the difference between publishReplay + refCount and shareReplay is analogous to their plain versions.

publishReplay(n) + refCount() : Any new subscriber will get the last n values from ReplaySubject and re-subscribe to source again using same ReplaySubject if the source hasn’t completed yet.

shareReplay({refCount: true, bufferSize: n}) : For new subscribers if source has completed it will emit last n values from ReplaySubject, but if source hasn’t completed it will only subscribe to source again using new ReplaySubject

So publishReplay + refCount will emit the buffered value(s) after completion and shareReplay will not. This is because share creates a new (Replay)Subject.

Now you may have noticed the other variable in the shareReplay code. Setting refCount: false means it won’t disconnect ReplaySubject from source where there are no more subscribers to ReplaySubject. So previous events (before zero refCount) will not be send again to new subscribers.

shareReplay(n) === shareReplay({refCount: false, bufferSize: n})

Behavior subject

publishBehavior() === multicast(new BehaviorSubject())

The publishBehavior(default_value)  operator will return default_value if source has not emitted any values. If the source hasn’t completed yet, new subscribers will get the last value from BehaviorSubject and re-subscribe to source using same BehaviorSubject. If source has completed all new subscribers will get only completed notification.

Async subject

publishLast() === multicast(new AsyncSubject())

No matter how many subscribers are connected to AsyncSubject, it won’t emit any value until source completes, however any side effect will be executed. When source completes AsyncSubject completes too and emits last value and complete notification to all subscribers, current and new.

Hopefully you can now share your data the way you like with RxJS multicasting. Happy sharing!