Graduate Program KB

RxJS and Observables


Useful Links

RxJS

  • Reactive Extension for Javascript
  • Library for composing asynchronous and event-based programs
    • Uses observable sequences
  • Combines the Observer pattern with the iterator pattern and functional programming

The observer pattern is a software design pattern in which an object, which is called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes. This is usually done by calling one of their methods.

Push and Pull

  • Protocols which describe how a data producer can communicate with a data consumer
  • As the name suggests, they are opposite

Pull

  • Consumer determines when it receives data from producer
  • Essentially a function
    • A function is defined (production)
    • The function is called later (consumption)
  • The producer (function definition) has no idea when or how the data will be consumed

The function CALL, pulls the returned value/data from the producer (function definition). Generators and iterators are also examples of pull systems in which multiple values can be pulled.

Push

  • Producer determines when to send data to the consumer
  • Consumer unaware when it will receive data
  • Example of a Push system in JS?
    • Hint: We did a TDD workshop on it...

Push/Pull

RxJS introduces the notion of observables which allow us to push multiple values. We could previously only push single values with a promise.

Stream

  • Sequence of data values over time
  • Discussed in Kyle's NodeJS course
  • Example: Incrementing numbers, printed each second

Observables

  • Functions which returns a stream of values over time

  • They are like a wrapper around a stream

  • Utilise the push system

    • Allow for multiple values to be pushed
  • Don't do anything unless subscribed to

Subscribing to an observable is like telling the callback function passed to the observable to be executed. This occurs each time an observable is subscribed to. Each subscribed observer owns an independent execution of the Observable. This makes an observable unicast, it only pushes out its data to one subscriber, although there can be multiple execution contexts which make it seem multicast.

Subscriptions

  • With a subscribe method, the Observer connects to the Observables to execute a code block
  • Can unsubscribe
  • Subscribing to an Observable is analogous to calling a Function

Calling a function (func.call() for example), essentially means 'give me one value synchronously', whereas observable.subscribe() means 'give me any amount of values, either synchronously or asynchronously'.

So Far

observable.subscribe(??);

Observer

  • Object which receives notifications from the observables
  • Simply an object with three callbacks, one for each type of notification an observables may deliver

Observer Continued

const observer = {
  next: (x) => console.log('Next Value: ' + x),
  error: (err) => console.error('Error Received: ' + x),
  complete: () => console.log('Complete Notification Received'),
}
observable.subscribe(observer)

Observer Continued

foo.subscribe((y) => {
  console.log(y)
})

// Equivalent

foo.subscribe(observer)
observer = {
  next: (x) => console.log('Next Value: ' + x),
}

In previous example of observers, we tended to only pass a single callback function to the 'subscribe' of an observable. This is interpreted as the 'next' callback handler. Behind the scenes in observable.subscribe, as Observer object is created using the provided callback as the 'next' handler. Omitting the other functions will mean that some types of notifications received from the observer may be ignored (complete and error in this example).

Operators

  • Functions used to manipulate Observable streams
  • Allow for multiple operators to be chained to form complex pipelines
    • Shape: Take in an observable stream and return a new observable stream
  • Two types of operators

Operators Continued

  • Pipeable operators
    • Pipeable Operator Factories (take in parameters)
    • Pipeable Operators
  • Creation Operators
    • Static creation operators (like interval, or in the later example we have fromEvent)

Pipeline operators are the kind which can be piped to Observables (map, take, filter, etc). Creation operators are those used to create an observable with some predefined behaviour (starting values for example), or to join together observables (concat, of, from etc.).

Subject

  • Special observable which allows values to be multicasted to many observers
  • They maintain a list of listeners
  • Running analogy of a concert
    • One singer/band (subject)
    • Large audience of people (observers)

Subjects are also an OBSERVER. This means if we wished, we could pass a subject as an argument to observable.subscribe.

observable.subscribe(subject)

Behaviour Subject

  • Will give the most recent value before subscription
  • Concert Example:
    • You arrive late
    • Your friend tells you what the previous song was
    • You watch the rest of the concert

Replay Subject

  • Keeps a buffer of previous emitted values

  • When subscribed to, will replay those values

  • Concert:

    • You arrive late
    • Your friend has a list of all the previous songs that played
    • You continue to watch the rest of the concert

Replay Subject Continued

  • We can tell the ReplaySubject how many values to replay
  • Will replay the 3 previous values
const subject = new ReplaySubject(3)

AsyncSubject

  • The last value of the Observable is sent to the observers when execution completes
  • Concert:
    • You miss the whole concert
    • You see your friend afterward
    • He tells you only the last song of the concert
    • You do not get to see any more of the concert

Use Cases

Normal Way:

document.addEventListener('click', () => console.log('Clicked!'))

RxJS:

import { fromEvent } from 'rxjs'
fromEvent(document, 'click').subscribe(() => console.log('Clicked'))

Use Cases Continued

  • Often we want to create a stream from an array (maybe from a promise which resolves to an array)
const myExampleArray = [1, 2, 3];
const myObservableStream = of(myExampleArray).pipe(
  mergeAll();
);

The of() creation operator converts an array into an observable array. We then uses .pipe() along with mergeAll() to convert this from an array into a stream of values which are taken from the array itself.

Use Cases Continued

const numberOfConcurrentEmails = 5;

const sendEmailObserver = {
  next: (success) => emailsSent++,
  error: (err) => failedEmails++,
  complete: () => console.log('Emails Sent: ' + emailsSent + ', Failed Emails: ' + failedEmails)
}
const allUsers = from(getAllUsersFromDatabase());

allUsers.pipe(
  filter(user => user.isActive),
 mergeMap(user => from(sendEmail(user.email)), numberOfConcurrentEmails);
).subscribe(sendEmailObservable)

  1. We first create of observer with its three functions. This is called sendEmailObserver
  2. We then create an observable called allUsers, using the from() creation operator. We pass an async function to from().
  3. Next we filter the observable stream to only active users.
  4. mergeMap runs the async function sendEmail (this runs in parallel, so we pass numberOfConcurrentEmails as to not overload our email server).

Use Cases Continued

import {} from /* ... */ 'rxjs'

const searchBox = document.getElementById('search-box')

const typeahead = fromEvent(searchBox, 'input').pipe(
  map((e) => e.target.value),
  filter((text) => text.length > 2),
  debounceTime(400),
  distinctUntilChanged(),
  switchMap((searchTerm) => ajax(/* ... {searchTerm}*/)),
)

typeahead.subscribe((data) => {
  // handle data from api
})

A typeahead is a method for progressively searching for and filtering through text. This case is a text box which when typed in, create a new observable. We then use our pipeable operators to modify our stream to suit our needs. We know what map and filter do. Debounce ensures that we aren't doing too much computational work. It only allows events through every 400ms. distinctUntilChange() also will not let anything through unless the new value is different to the last.

Final Notes:

  • Promises:
    • Single async operations (fetching data from API)
  • Observables:
    • Handling streams of data (user input, HTTP responses, values emitted over time)
  • Subjects:
    • When multiple parts of an application need to communicate/react in real time