Operators & Subscriptions (Observables)
Introduction
We discussed Observables in the previous post. Observables provide us data stream over time. Once an Observables produces data, we need to be able to modify/manipulate it or use it. To do this is where operators and subscription come in.
Operators are functions that operate on an Observable and return an Observable. This allows us to chain these operators. Each operator in the chain modifies the Observable that results from the operation of the previous operator.
Operators in a chain do not work simultaneously, but they operate in sequence, each one working on the Observable generated by the operator immediately before in the chain.
Subscription is done using the subscribe
operator. A subscribe
operator allows the observer to connect with an Observable. For an observer to get data or error from an Observable, it first has to subscribe
to that Observable.
Operators
Operators in RxJS are categorized into two sections:
- Pipeable operators are operators that can be chained together. These are pure functions that take an observable as input and provide an observable as output.
observeable.pipe(
operator1(),
operator2(),
operator3(),
operator3(),
)
operator1
will take in the observable
, perform an operation on it and emit an observable. The emitted observable from operator1
is passed to operator2
as input (and so forth through the rest of the operators).
filter
, mergeMap
and forkJoin
are some examples of pipeable operators.
- Creation operators are standalone functions that create a new Observable.
import { of } from 'rxjs';
const observable = of(1, 2, 3);
The variable observable
is an Observable that will emit 1, 2, and 3 (in sequence).
create
, of
and from
are examples of creation operators.
Subscription
Before we get into subscription, let’s understand the difference between hot and cold observable.
A “cold” 🥶 Observable does not begin to emit values until an observer has subscribed to it.
A “hot” 🔥 Observable, on the other hand, can begin emitting values at any time, and a subscriber may begin observing emitted values at any time. But, the subscriber might be missing out on any values emitted before the time of the subscription.
So how do we subscribe to an Observable?
const observable = Observable.create((observer:any) => {
observer.next("Hello World!");
})
observable.subscribe((message) => console.log(message)); // Hello World!
We’ve created a variable observable
that is an Observable. It returns or emits the value “Hello World!”. We can subscribe
to observable
and get that value using a callback to subscribe
.
Our observer can implement 0 to 3 methods in RxJS: onNext
, onError
and onCompleted
.
onNext
is called when the Observable emits a value.
onError
is called when the Observable fails to generate value.
onCompeted
is called by the Observable when it has called onNext
for the final time (if it hasn’t run into an error).