December 23, 2021

Angular - Using RxJS Operators mergeMap and concatMap

The Angular MergeMap and ConcatMap are used to map each value from the source observable into an inner observable. It internally subscribes to the source observable, and then starts emitting the values from it, in palce of the original value. A new inner observable will be created for every value it receives from the Source. It merges the values from all of its inner observables and emits the values back into the stream.

Difference between MergeMap and ConcatMap is that ConcatMap maintains the order of its inner observables, while MergeMap can process the source observables in any order depending on the execution time-period of each observable.

ConcatMap operator

ConcatMap processes the source observables in a serialized fashion waiting for each one to complete before moving to the next.

Lets see any example:

////on the top of the file, import the operator
//import { concatMap } from 'rxjs/operators';

//an observable of numbers (milliseconds), we use as its values to cause the delay 
const source = of(2000, 1000);

// map value from source into inner observable, once its completes, then it will move to next value in the source observable
const newSource = source.pipe(
  concatMap(val => of(`Delayed by: ${val} ms`).pipe(delay(val))) //creates a new observable, the following section is actually subscribe to this observable
);

//subscribe to the new observable (internally created by concatMap)
const subscribe = newSource.subscribe(val =>
  console.log(`With concatMap: ${val}`)
);

This code is performing the following actions:

  • A source observable is defined with two values 2000 and 1000 representing milliseconds.
  • concatMap is used to receive values from source and emit its own values from the new observable. Before emitting the value, the inner observable is calling the delay function to simulate delay in execution. As per the values provided in the source observable, first value cause delay for 1 second and second value will cause a delay for 2 seconds.
  • The new observable is assigned to the variable newSource.
  • In the end, we subscribe to the newSource observable, and write the output to the console.

Here is the sample output from above code:

With concatMap: Delayed by: 2000 ms 
With concatMap: Delayed by: 1000 ms

From this output, its clear that the concatMap will keep the original order of values emitted from the source. The values are being displayed in the same order as we supplied in the source observable. Even the second value has shorter delay of 1 second, but it will wait for the first value (with longer delay of 2 seconds) to complete before moving to next value in the source observable.

The concatMap assures the original sequence of values. If we have multiple inner Observables, the values will be processed in sequential order. Next value will only be processed after the previous one is completed.

MergeMap operator

MergeMap is similar to the contactMap with one difference that it processes the source observables without any assurance of the order of provided values.

Lets see any example, we use the same source observable (used in above exmaple):

////on the top of the file, import the operator
//import { mergeMap } from 'rxjs/operators';

//an observable of numbers (milliseconds), we use as its values to cause the delay 
const source = of(2000, 1000);

// map value from source into inner observable, it will move to next value (whenever available) in the source observable, will not wait for the previous one to complete
const newSource = source.pipe(
  mergeMap(val => of(`Delayed by: ${val} ms`).pipe(delay(val))) //creates a new observable, the following section is actually subscribe to this observable
);

//subscribe to the new observable (internally created by mergeMap)
const subscribe = newSource.subscribe(val =>
  console.log(`With mergeMap: ${val}`)
);

This code is performing the following actions:

  • A source observable is defined with two values 2000 and 1000 representing milliseconds.
  • mergeMap is used to receive values from source and emit its own values from the new observable. Before emitting the value, the inner observable is calling the delay function to simulate delay in execution. As per the values provided in the source observable, first value cause delay for 1 second and second value will cause a delay for 2 seconds.
  • The new observable is assigned to the variable newSource.
  • In the end, we subscribe to the newSource observable, and write the output to the console.

Here is the sample output from above code:

With mergeMap: Delayed by: 1000 ms 
With mergeMap: Delayed by: 2000 ms

From this output, we know that the mergeMap do not keep the original order of values emitted from the source. The values can be displayed in the order of their execution time-period. The sooner the value is processed, it will be emitted to the subscription. Since the second value has shorter delay of 1 second, it does not wait for the first value (with greater delay of 2 seconds) to complete, and hence it gets processed before the first.

The mergeMap does not assure the original sequence of values. If we have multiple inner Observables, the values may be overlapped over time, because values will be emitted in parallel.

References:

Related Post(s):

December 16, 2021

Angular - Using RxJS Operators take and skip

take and skip operators are used to limit the number of values emitted from the source observable.

take operator

take operator returns the observable which will limit the number of values emitted and receive the first n number of values. It will take a count argument representing the max number of values expecting to be received. Usually it is being used by passing 1 as count argument, to take only the first value emitted from an observable. After receiving the n number of values, it will complete the observable, so any more values emitted after the first one will be ignored (or not received).

Lets see this example:

const sourceObservable = of(1, 2, 3, 4, 5);
const wrapperObserable = sourceObservable.pipe(take(1));
const subscribe = wrapperObserable.subscribe(val => console.log('Received Value: ' + val));

The output will be:

Received Value: 1

Here, we created a source observable which will emit values 1,2,3,4,5. Then we used the take operator with count argument as 1, and the wrapper observable will be able to emit only 1 value, hence the subscription will receive 1 value.

Lets change the count argument to 3:

const wrapperObserable = sourceObservable.pipe(take(1));
const subscribe = wrapperObserable.subscribe(val => console.log('Received Value: ' + val));

This time, the output will be:

Received Value: 1
Received Value: 2
Received Value: 3

Note that, we are not making any changes to the source observable, but we changed the count argument to the take operator to receive the desired number of values.

skip operator

skip operator also returns the observable which will limit the number of values emitted, but it works in opposite to the take operator. It will ignore the first n number of vaues and receive all of the remaining values. It will take a count argument representing the max number of values expecting to be skipped.

Lets see this example:

const sourceObservable = of(1, 2, 3, 4, 5);
const wrapperObserable = sourceObservable.pipe(skip(1));
const subscribe = wrapperObserable.subscribe(val => console.log('Received Value: ' + val));

The output will be:

Received Value: 2
Received Value: 3
Received Value: 4
Received Value: 5

Here, we created a source observable which will emit values 1,2,3,4,5. Then we used the skip operator with count argument as 1, and the wrapper observable will be able to skip 1 value, and the subscription will receive remaining all values 2,3,4,5.

Lets change the count argument to 3:

const wrapperObserable = sourceObservable.pipe(skip(1));
const subscribe = wrapperObserable.subscribe(val => console.log('Received Value: ' + val));

This time, the output will be:

Received Value: 4
Received Value: 5

References:

Related Post(s):