An Introduction to RxJS
We have spent quite a bit of time diving into the theory behind Observables, now
we are going to go to a higher level and just use RxJS
observables. RxJS provides us with observables that we can use out of the box,
along with a ton of helpful operators
to handle just about any situation you
might run into.
We already have a decent understanding of RxJS because of what we learned in the
last lesson — after all, RxJS is basically observables + extra stuff to help
with observables. RxJS provides us with ready-to-go implementations of
Observable
and Subject
so that we don’t need to build our own, along with
a ton of other goodies. We’ve already run into one of those goodies so far, the
BehaviorSubject
which is another type of observable, but there is quite a bit
more to cover. Once you are comfortable even with the basics of RxJS it unlocks
so much power. I learned RxJS many years into my programming career, and it is
probably the thing that has had the most significant impact on my coding style
and how much I enjoy building applications.
NOTE: I would highly recommend trying out the examples in this lesson yourself. You can do this within an example Angular application if you like, or you could use the RxJS StackBlitz to play around, or you could even use the NodeJS REPL as I describe in this video.
Creating Observables with RxJS
The way we create an Observable with RxJS is basically the same as using the
Observable
class we created in the last lesson:
import { Observable } from 'rxjs';
const myObservable$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.next(4);
subscriber.next(5);
});
And we use it in the same way:
const observer = {
next: (data) => console.log('My next was called with', data),
complete: () => console.log('The observable has finished emitting data'),
error: (err) => console.log('The following error ocurred', err)
}
myObservable$.subscribe(observer);
or more simply:
myObservable$.subscribe((data) => console.log(data));
We can manually create an observable with RxJS like this, but more typically we will either just be using an observable that has already been created for us (e.g. something we have received from Angular) or we will use a creation operator to create the observable stream for us.
Creation Operators
RxJS comes with over 100 operators that can create or transform observable
streams for us. Again, not to make them seem more mystical than they are, an
operator
is really just a function. We have two types of operators in RxJS:
creation operators that create new observable streams for us, and
pipeable operators that take an existing observable as an input, and return
a new observable.
We will look at more of these operators in detail in a later lesson, but let’s just focus on a couple of basic ones now.
Let’s look at our example from before again of creating an observable that emits
the numbers 1
through to 5
:
const myObservable$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.next(4);
subscriber.next(5);
});
We could simplify this a bit by doing this:
const myObservable$ = new Observable(subscriber => {
for(let i = 1; i <= 5; i++){
subscriber.next(i);
}
});
Or, we could use the from
creation operator from RxJS:
import { from } from 'rxjs'
let myObservable$ = from([1, 2, 3, 4, 5]);
This will give us the same result. When we subscribe to it, it will output:
1
2
3
4
5
The from
creation operator can create an Observable from an array like above,
but it can also automatically create an observable from any array-like object,
a Promise, an iterable object, or an Observable-like object. Most commonly, in
my code at least, I use from
with arrays.
Another common creation operator is of
:
import { of } from 'rxjs'
let myObservable$ = of([1, 2, 3, 4, 5]);
This is very similar, except it won’t turn the array into individual emissions on the stream. This will create a stream that just emits the entire array as a single value. If we subscribe to this, we would get:
[1, 2, 3, 4, 5]
Sometimes it is useful for something to be a stream of a value, rather than just
being the value, and of
is good for these cases. Maybe we just need a stream
of a simple string value, and we could do this:
const myValue$ = of('some string');
Why we would want to do this might not be clear right now, but hopefully it will
become clearer as we progress. There are other creation operators that are
extremely useful, like combineLatest
, but that is a bit more complex and we
are going to get to it later.
The Pipe Method
As well as creation operators that create new observables for us, we also have other operators that can “transform” an existing observable. It is important to note that in this “transformation” the operator does not change the existing observable stream, it takes the existing observable stream as an input and returns a new stream.
These are the pipeable operators, which are named as such as they are passed
into the pipe
method that can be chained on to an observable like this:
import { map, filter } from "rxjs/operators"
myObservable$.pipe(
map((value) => value * 2),
filter((value) => value < 7)
)
Let’s assume that myObservable$
is the stream we created before that emits the
numbers one through five in sequence:
1
2
3
4
5
If we subscribe
to this new stream created by the pipe:
myObservable$.pipe(
map((value) => value * 2),
filter((value) => value < 7)
).subscribe((val) => console.log(val));
We won’t get the numbers 1
through to 5
being logged out to the console.
Have a guess at what you think the result of this stream will be.
Click here to reveal solution
Solution
The result will be:
2
4
6
Let’s talk about exactly why. What is happening at a surface level is
reasonably intuitive since RxJS operators have such a declarative syntax
(more on that later), and the map
and filter
RxJS operators behave similarly
to the map
and filter
array methods (only they act on data emitted from
streams, not elements of arrays).
We start with our standard stream emissions:
1
2
3
4
5
Then we apply this operator to multiply all values by 2
:
map((value) => value * 2)
Which results in:
2
4
6
8
10
And then we use this operator:
filter((value) => value < 7)
Which will cause only values less than 7
to be emitted, which results in:
2
4
6
But let’s talk about what is happening at a deeper level with the pipe
method,
as understanding this will become more important when you are composing streams
together in more complex ways. Let’s take a look at this again:
import { map, filter } from "rxjs/operators"
myObservable$.pipe(
map((value) => value * 2),
filter((value) => value < 7)
).subscribe((val) => console.log(val));
When we subscribe
to this, we aren’t subscribing to the myObservable$
stream. We are subscribing to the observable stream that the filter
operator
returns. When we subscribe to that filter
operator, it will automatically
subscribe to the stream returned by map
so that it can get the value it needs
to do its job and return the new stream for us. However, when filter
subscribes to map
, the map
operator also needs the value from the input
stream which is our original myObservable$
stream, so map
subscribes to
myObservable$
.
We have one long chain of subscriptions. We trigger the first subscription which will be on whatever stream the last operator returns, then each operator will subscribe to the operator above it until we get all the way to the input stream:
- We subscribe to the stream returned by
filter
filter
subscribes to the stream returned bymap
map
subscribes to the input stream
We just have two operators piped on here, but it is not uncommon to have even
5 or more operators inside of the pipe
. Another useful operator, especially
for learning and debugging, is the tap
operator:
import { map, filter, tap } from "rxjs/operators"
myObservable$.pipe(
tap((value) => console.log("Before map: ", value)),
map((value) => value * 2),
tap((value) => console.log("Before filter: ", value)),
filter((value) => value < 7)
).subscribe((val) => console.log("Stream emitted:", val));
The tap
operator doesn’t actually do anything — it doesn’t modify the stream
in any way. All it does is return the stream above it unchanged, but it does
serve two purposes:
- It allows us to see what the value is at that point within the
pipe
(great for debugging) - It allows us to execute arbitrary code. If we want to run some “side effect” (e.g. maybe we want to set some value outside of the stream) we can do that with a
tap
without actually impacting the stream itself
Let’s imagine our series of operators were not giving us the results we were
expecting, we could add in a few tap
operators as above and we would see this
logged out to the console:
Before map: 1
Before filter: 2
Stream emitted: 2
Before map: 2
Before filter: 4
Stream emitted: 4
Before map: 3
Before filter: 6
Stream emitted: 6
Before map: 4
Before filter: 8
Before map: 5
Before filter: 10
We can easily see here where the stream stops emitting values. The map
, tap
,
and filter
operators are perhaps the most commonly used operators, but there
are a few more that we will also commonly use. We are going to go through some
more of those, and look at some realistic applications of them, in the next
lesson.
Recap
What are the two types of operators RxJS provides?
Incorrect
Correct!
Incorrect
What does a creation operator do?
Correct!
Incorrect
Incorrect
What does a pipeable operator do?
Incorrect. This is close, but a pipeable operator does not modify the existing stream. It can use an existing stream as input, but it will return a new stream
Correct!
Which of the following is the correct usage of a pipeable operator?
Incorrect
Correct!
Incorrect