3d cartoon hands holding a phone

Unlock full course by purchasing a membership

Lesson 2

An Introduction to RxJS

STANDARD

An Introduction to RxJS

We have spent quite a bit of time diving into the theory behind Observables, now we are going to go to a higher level and just use RxJS observables. RxJS provides us with observables that we can use out of the box, along with a ton of helpful operators to handle just about any situation you might run into.

We already have a decent understanding of RxJS because of what we learned in the last lesson — after all, RxJS is basically observables + extra stuff to help with observables. RxJS provides us with ready-to-go implementations of Observable and Subject so that we don’t need to build our own, along with a ton of other goodies. We’ve already run into one of those goodies so far, the BehaviorSubject which is another type of observable, but there is quite a bit more to cover. Once you are comfortable even with the basics of RxJS it unlocks so much power. I learned RxJS many years into my programming career, and it is probably the thing that has had the most significant impact on my coding style and how much I enjoy building applications.

NOTE: I would highly recommend trying out the examples in this lesson yourself. You can do this within an example Angular application if you like, or you could use the RxJS StackBlitz to play around, or you could even use the NodeJS REPL as I describe in this video.

Creating Observables with RxJS

The way we create an Observable with RxJS is basically the same as using the Observable class we created in the last lesson:

import { Observable } from 'rxjs';

const myObservable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.next(4);
  subscriber.next(5);
});

And we use it in the same way:

const observer = {
    next: (data) => console.log('My next was called with', data),
    complete: () => console.log('The observable has finished emitting data'),
    error: (err) => console.log('The following error ocurred', err)
}

myObservable$.subscribe(observer);

or more simply:

myObservable$.subscribe((data) => console.log(data));

We can manually create an observable with RxJS like this, but more typically we will either just be using an observable that has already been created for us (e.g. something we have received from Angular) or we will use a creation operator to create the observable stream for us.

Creation Operators

RxJS comes with over 100 operators that can create or transform observable streams for us. Again, not to make them seem more mystical than they are, an operator is really just a function. We have two types of operators in RxJS: creation operators that create new observable streams for us, and pipeable operators that take an existing observable as an input, and return a new observable.

We will look at more of these operators in detail in a later lesson, but let’s just focus on a couple of basic ones now.

Let’s look at our example from before again of creating an observable that emits the numbers 1 through to 5:

const myObservable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.next(4);
  subscriber.next(5);
});

We could simplify this a bit by doing this:

const myObservable$ = new Observable(subscriber => {
  for(let i = 1; i <= 5; i++){
    subscriber.next(i);
  }
});

Or, we could use the from creation operator from RxJS:

import { from } from 'rxjs'

let myObservable$ = from([1, 2, 3, 4, 5]);

This will give us the same result. When we subscribe to it, it will output:

1
2
3
4
5

The from creation operator can create an Observable from an array like above, but it can also automatically create an observable from any array-like object, a Promise, an iterable object, or an Observable-like object. Most commonly, in my code at least, I use from with arrays.

Another common creation operator is of:

import { of } from 'rxjs'
let myObservable$ = of([1, 2, 3, 4, 5]);

This is very similar, except it won’t turn the array into individual emissions on the stream. This will create a stream that just emits the entire array as a single value. If we subscribe to this, we would get:

[1, 2, 3, 4, 5]

Sometimes it is useful for something to be a stream of a value, rather than just being the value, and of is good for these cases. Maybe we just need a stream of a simple string value, and we could do this:

const myValue$ = of('some string');

Why we would want to do this might not be clear right now, but hopefully it will become clearer as we progress. There are other creation operators that are extremely useful, like combineLatest, but that is a bit more complex and we are going to get to it later.

The Pipe Method

As well as creation operators that create new observables for us, we also have other operators that can “transform” an existing observable. It is important to note that in this “transformation” the operator does not change the existing observable stream, it takes the existing observable stream as an input and returns a new stream.

These are the pipeable operators, which are named as such as they are passed into the pipe method that can be chained on to an observable like this:

import { map, filter } from "rxjs/operators"

myObservable$.pipe(
    map((value) => value * 2),
    filter((value) => value < 7)
)

Let’s assume that myObservable$ is the stream we created before that emits the numbers one through five in sequence:

1
2
3
4
5

If we subscribe to this new stream created by the pipe:

myObservable$.pipe(
    map((value) => value * 2),
    filter((value) => value < 7)
).subscribe((val) => console.log(val));

We won’t get the numbers 1 through to 5 being logged out to the console. Have a guess at what you think the result of this stream will be.

The result will be:

2
4
6

Let’s talk about exactly why. What is happening at a surface level is reasonably intuitive since RxJS operators have such a declarative syntax (more on that later), and the map and filter RxJS operators behave similarly to the map and filter array methods (only they act on data emitted from streams, not elements of arrays).

We start with our standard stream emissions:

1
2
3
4
5

Then we apply this operator to multiply all values by 2:

map((value) => value * 2)

Which results in:

2
4
6
8
10

And then we use this operator:

filter((value) => value < 7)

Which will cause only values less than 7 to be emitted, which results in:

2
4
6

But let’s talk about what is happening at a deeper level with the pipe method, as understanding this will become more important when you are composing streams together in more complex ways. Let’s take a look at this again:

import { map, filter } from "rxjs/operators"

myObservable$.pipe(
    map((value) => value * 2),
    filter((value) => value < 7)
).subscribe((val) => console.log(val));

When we subscribe to this, we aren’t subscribing to the myObservable$ stream. We are subscribing to the observable stream that the filter operator returns. When we subscribe to that filter operator, it will automatically subscribe to the stream returned by map so that it can get the value it needs to do its job and return the new stream for us. However, when filter subscribes to map, the map operator also needs the value from the input stream which is our original myObservable$ stream, so map subscribes to myObservable$.

We have one long chain of subscriptions. We trigger the first subscription which will be on whatever stream the last operator returns, then each operator will subscribe to the operator above it until we get all the way to the input stream:

  • We subscribe to the stream returned by filter
  • filter subscribes to the stream returned by map
  • map subscribes to the input stream

We just have two operators piped on here, but it is not uncommon to have even 5 or more operators inside of the pipe. Another useful operator, especially for learning and debugging, is the tap operator:

import { map, filter, tap } from "rxjs/operators"

myObservable$.pipe(
    tap((value) => console.log("Before map: ", value)),
    map((value) => value * 2),
    tap((value) => console.log("Before filter: ", value)),
    filter((value) => value < 7)
).subscribe((val) => console.log("Stream emitted:", val));

The tap operator doesn’t actually do anything — it doesn’t modify the stream in any way. All it does is return the stream above it unchanged, but it does serve two purposes:

  • It allows us to see what the value is at that point within the pipe (great for debugging)
  • It allows us to execute arbitrary code. If we want to run some “side effect” (e.g. maybe we want to set some value outside of the stream) we can do that with a tap without actually impacting the stream itself

Let’s imagine our series of operators were not giving us the results we were expecting, we could add in a few tap operators as above and we would see this logged out to the console:

Before map:  1
Before filter:  2
Stream emitted: 2
Before map:  2
Before filter:  4
Stream emitted: 4
Before map:  3
Before filter:  6
Stream emitted: 6
Before map:  4
Before filter:  8
Before map:  5
Before filter:  10

We can easily see here where the stream stops emitting values. The map, tap, and filter operators are perhaps the most commonly used operators, but there are a few more that we will also commonly use. We are going to go through some more of those, and look at some realistic applications of them, in the next lesson.

Recap

What are the two types of operators RxJS provides?

What does a creation operator do?

What does a pipeable operator do?

Which of the following is the correct usage of a pipeable operator?