Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through

John Lindquist
InstructorJohn Lindquist
Share this video with your friends

Social Share Links

Send Tweet
Published 6 years ago
Updated 5 years ago

Understanding sources and subscribers makes it much easier to understand what's going on with mergeMap under the hood. Where a typical operator invokes destination.next directly, mergeMap wraps destination.next inside of a new source/subscriber combo so there's an "outer" next and an "inner" next.

Instructor: [00:00] When I click on the document, this is just going to increment a value, starting with zero and then going up from there. Click, click, click, click. That's one, two, three, four, each one being pushed into next. That value is just coming from there.

[00:15] There are plenty of scenarios here where inside of next you would think I should create a new observable here. Say I'll create of the value, then I'll just delay that.

[00:29] Let's import delay as well, delay. Delay that 500 milliseconds. I need to subscribe here as well. Then I'll create a subscriber with a next value and then just log out the innerValue.

[00:48] When I click, you'll see one, then inner one. Click, click. Two, three, inner two, inner three." Click, click, click. Each of these values comes before and then creates this and subscribes to it.

[01:02] This is not the recommended approach. What you would actually do with RxJS is create a mergeMap operator. mergeMap, I'll import that.

[01:15] Then mergeMap will take that value. I'm just going to paste what I cut out of there and delete the subscribe block.

[01:25] Hit save. Let it format. You can see that that value, which before was being pushed through this next is now being pushed through mergeMap. mergeMap itself is going to merge into this observable and then push those values down here.

[01:40] If I click, click, click, you see those values come out delayed. Click, click, click -- four, five, six. This inner observable is now passing along those values to this next, where before they were going directly from next to here.

[01:57] Now there's an inner next call which is passing it out of this mergeMap and into this and subscribing to this and passing those values down to next.

[02:05] To create my own myMergeMap, this will take a function. You can see this function here and the source. Do the source lift trick where we pass in an object that has a call method, so we have that subscriber in the source.

[02:23] Then we want to source subscribe and then create a subscriber which we'll pass this subscriber into. We'll create our class of myMergeMapSubscriber and extend subscriber and then create a new instance of myMergeMapSubscriber and pass in this subscriber.

[02:50] We'll also want to pass in this function -- there's that function -- and create our constructor to handle that. We have that subscriber and a function. We'll say, "super(subscriber)." Let's say this.function is that function we passed in.

[03:07] From here, we can implement our next so the value coming in from each click, which is being piped through scan, so it's not the click event. This way will be the click event, but this way we're getting that incremented value.

[03:21] This will be one, two, three, four, five. What I want to do here is invoke that function with this value. This will be our new observable. Essentially, this function coming into here is almost like a little factory which is going to create these observables each time this comes through.

[03:40] We call next. We get this one, two, three, four, five. We pass that in and create an observable.

[03:46] Simply enough, this will look like the subscribe block from the beginning of the lesson. We have subscribe and then a next and a value.

[03:55] Then we'll just say "this.destination.next(value)." The last thing to do is use myMergeMap and pop it in here. Hit save. It'll format a bit.

[04:09] When I click, you can see that delayed one, two, three, four, five and can see those are being delayed because each time I click, it increments. Those nexts are going into here.

[04:24] It's using this function that we passed into myMergeMap, which is creating these observables. We can just subscribe to that. When that observable says next, we send that value along to the destination -- again, that destination being the original subscriber.

[04:44] If it helps to visualize this a bit, I can say, "console.log(outerValue)" and "console.log(innerValue)." Let me indent that a little bit. Hit save. I'll click. Outer one, inner one -- click, click. Outer two, three. Inner two, three.

[05:07] These outers being represented by the click map to the one, two, three. The inners are being passed along into this of delay observable.

J. Matthew
J. Matthew
~ 5 years ago

mergeMap makes sense now!

I see how, whereas map directly transforms a given value in the stream, mergeMap creates a new stream (Observable) from that value, and then merges the output from the new stream into the old stream (or replaces the old stream with the new, depending on how you want to look at it).

delay is obviously a contrived example (albeit an effective one), but it's easy to see how critical this would be for practical asynchronous actions, like HTTP requests.

Markdown supported.
Become a member to join the discussionEnroll Today