Using RxJS and Its Combination Operators

In one of our previous posts, we covered one of the ways we use Observables. In that post we briefly covered how we use Observable/BehaviorSubject to easily distribute AJAX data to Angular components for one of our clients.

But often times, we rely on more than one AJAX  call to load up a component. Due to the asynchronous nature of AJAX call we require ways to hold off on initializing the component until we receive all of the AJAX responses. In another words, we require ways to synchronize the AJAX responses.

A overwhelming list of combination operators can be found below:

http://reactivex.io/rxjs/manual/overview.html#combination-operators

As you can see there are a lot of combination operators and regrettably, I have not experimented with all of them. But I have selected a small number of them I have used and present them in a  list format:

If you’re thinking in the lines of:

I don’t care about all of the data in the stream. Just bring me the combination of latest values.

…then forkJoin method may be the one for you. The following link is a simple code that demonstrates the forkjoin for you to toy with:

http://jsbin.com/jerarut/edit?html,js,console,output

const getValue = (id) => document.getElementById(`input${id}`).value

const clickEvent = (id) => Rx.Observable
 .fromEvent(
 document.getElementById(id),'click', ()=> getValue(id)
 ).takeUntil(unsubscribe)

const unsubscribe = Rx.Observable
 .fromEvent(
 document.getElementById("end"),'click'
 )

const streamA = clickEvent('A');

const streamB = clickEvent('B');


const forkJoin = Rx.Observable.forkJoin(streamA, streamB)
 .subscribe(
 forkJoin => console.log('forkJoin', forkJoin)
 )

Taken from the official documentation, “first value is the last thing emitted by the first Observable, second value is the last thing emitted by the second Observable”.

In this example, we utilize takeUntil operator to signal streamA and streamB to close the stream at the click of ‘End’ button. When the ‘end’ is signaled, the forkJoin combines the latest values of the two stream and print them on console.

It’s important to remember that the forkJoin operator awaits for all the streams to emit values THEN combines the latest values; meaning the latest values will only be emitted ONCE. I make this important distinction because you don’t want to be confused with the upcoming operator (because I certainly did).

I need continuous update of all newest combinations of data as soon as they are emitted

If that is your need, then combineLatest operator is worth a a look. Because showing is much better than telling, please play around with the jsbin posted below:

http://jsbin.com/dojurot/2/edit?html,js,console,output

const getValue = (id) => document.getElementById(`input${id}`).value

const clickEvent = (id) => Rx.Observable
 .fromEvent(
 document.getElementById(id),'click', ()=> getValue(id)
 )

const unsubscribe = Rx.Observable
 .fromEvent(
 document.getElementById("end"), 'click', ()=> console.log('end clicked')
 )

const streamA = clickEvent('A');

const streamB = clickEvent('B');


const combineLatest = Rx.Observable
 .combineLatest(streamA, streamB)
 .subscribe(
 combined => console.log('combinedLatest', combined)
 )
It’s important to note that while forkJoin waits for all observables to complete emitting all values, combineLatest waits for all observables to emit its first values before it is invoked.

 

The next one may be my favorite because it fits my needs the most:

I need to properly group all data in parallel in the order they are emitted

From the wise words of the official document:

“Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.”

http://jsbin.com/dapideq/2/edit?html,js,console,output

const getValue = (id) => document.getElementById(`input${id}`).value

const clickEvent = (id) => Rx.Observable
 .fromEvent(
 document.getElementById(id),'click', ()=> getValue(id)
 )

const unsubscribe = Rx.Observable
 .fromEvent(
 document.getElementById("end"), 'click', ()=> console.log('end clicked')
 )

const streamA = clickEvent('A');

const streamB = clickEvent('B');


const zip = Rx.Observable.zip(streamA, streamB)
 .subscribe(
 zip => console.log('zip', zip)
 )
If the latest parameter is a function, its definition can be used to define HOW the emitted data can be paired up.

That sums up the main combination operators I use for the real life project. Although I intended this blog topic to be a very small one, I hope you find how useful these combination operators can be. Until next time!

 

 

 

Next Post

Comments

See how we can help

Lets talk!

Stay up to date on the latest technologies

Join our mailing list, we promise not to spam.