Get Reactive With RxJS

Sandeep Sharma
5 min readApr 2, 2020
Rxjs

What is Reactive Programming ?

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.

What is Observable ?

A datatype to rule over everything ;-)

MeaningLess Example

Log the first 10 clicks on the right side of window.

Imperative

let clicks = 0;
const clickHandler = e => {
const { innerWidth: width } = window;
const { clientX: xpos } = e;
if(xpos > width/2) {
if(clicks < 10) {
clicks++;
console.log(‘count=’, clicks, ‘xpos=’, xpos);
return;
}
document.removeEventListener(‘click’, clickHandler);
}
};
document.addEventListener(‘click’, clickHandler);

Reactive way

const { fromEvent } = rxjs;
const { filter, take } = rxjs.operators;
fromEvent(document, 'click').pipe(
filter(e => e.clientX > window.innerWidth/2),
take(10),
).subscribe(e => console.log('xpos=', e.clientX));

Observer Pattern

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

class Subject {
constructor() {
this.listeners = new Set();
}

add(listener) {
this.listeners.add(listener);
}

remove(listener) {
this.listeners.delete(listener);
}

notify(obj) {
this.listeners.forEach(item => {
item.update(obj);
})
}
}

class Event {
constructor(name, data) {
this.name = name;
this.data = data;
}

getName() {
return this.name;
}

getData() {
return this.data;
}
}
const listener1 = {
update(evt) {
console.log(`listener1 ${evt.getName()}`);
}
}
const listener2 = {
update(evt) {
console.log(`listener2 ${evt.getName()}`);
}
}

const testSubject = new Subject();
testSubject.add(listener1);
testSubject.add(listener2);
testSubject.notify(new Event('FirstEvent', 1));

Iterator Pattern

In object-oriented programming, the iterator pattern is a design pattern in which an iterator is used to traverse a container and access the container’s elements. The iterator pattern decouples algorithms from containers; in some cases, algorithms are necessarily container-specific and thus cannot be decoupled.

class Range {
constructor(start, end) {
this.start = start;
this.end = end;
}

hasNext() {
return this.start < this.end;
}

next() {
if(this.hasNext()) {
return this.start++;
}
}
}
const obj = new Range(0, 3);
console.log(obj.next(), obj.hasNext());
console.log(obj.next(), obj.hasNext());
console.log(obj.next(), obj.hasNext());
console.log(obj.next(), obj.hasNext());
//JS way
const range = (start, end) => ({
[Symbol.iterator]() {
return this;
},
next() {
if(start < end) {
return { value: start++, done: false };
}
return { value: undefined, done: true} ;
}
});
const obj = range(0, 3);
for(let item of obj) {
console.log(‘item=’, item);
}
console.log(Array.from(range(0, 3)), […range(0, 3)]);
console.log(obj.next());
console.log(obj.next());
const arr = [1,2,3].values();
console.log(arr.next().value);

Observable emits its values in order -like an iterator but instead of its consumer requesting next value, the observable pushes the values to its consumers as they become available.It plays the similar role like producer in observer pattern: emitting values and pushing it to its listeners.

Push vs Pull

In server-client context, push based system meant server will send updates to client instead of client polling server about it. Similar to the saying ‘Dont call us, we ‘ll call you’.

Rxjs is push based, source of events (Observable) will push new values to the consumer, without consumer requesting it.

Observable has two distinct features

  1. It don’t start emitting values until it has an observer.
  2. It can signal when the sequence aka stream completes.

Using Observable.create

const { Observable } = rxjs;
const { map } = rxjs.operators;
const obj = Observable.create(observer => {
observer.next(‘A’);
observer.next(‘B’);
// observer.error(new Error(‘I am Error’));
observer.complete();
observer.next(‘D’);
});
obj.subscribe({
next: console.log,
error(e){
console.error(e.message);
},
complete() {
console.log(‘completed’);
}
});
obj.pipe(
map(input => input.toLowerCase()),
).subscribe({
next: console.log,
});

Ajax request using Observable.create

const { Observable } = rxjs;
const getJson = url => Observable.create(observer => {
const http = new XMLHttpRequest();
http.onload = () => {
if(http.status === 200) {
observer.next(http.response);
observer.complete();
} else {
observer.error(new Error(http.statusText));
}
}
http.onerror = (e) => {
subscriber.error(new Error(‘Unknown Error’));
}
http.open(‘GET’, url);
http.send();
});
getJson(‘https://jsonplaceholder.typicode.com/users')
.subscribe(console.log);

Creating Observable from Array, Promise & Iterable

//Example using array
const { from } = rxjs;
from(['vue', 'react', 'angular', 'cyclejs']).subscribe(console.log);
//Example using Promise
const createPromise = () => new Promise((resolve, reject) => {
setTimeout(resolve, 1000, 'Hello Promise');
});
from(createPromise()).subscribe(console.log);
// Example using Iterator
const range = (start, end) => ({
[Symbol.iterator]() {
return this;
},
next() {
if(start < end) {
return { value: start++, done: false };
}
return { value: undefined, done: true} ;
}
});
from(range(0, 3)).subscribe(console.log);

Creating Observable from callback functions

// example using jquery
const { bindCallback } = rxjs;
const getJson$ = bindCallback($.getJSON);
getJson$('https://jsonplaceholder.typicode.com/users')
.subscribe(console.log);
// random example
const testCall = (input, cb) => {
setTimeout(() => {
const random = Math.floor(Math.random() * 100);
if(random > 50) {
cb(new Error('Error Occured'));
return;
}
cb(random);
}, 300);
}
const cb$ = bindCallback(testCall);
cb$(123).subscribe(console.log, console.error);

Basic Sequence Operators

map, filter, reduce, scan, flatMap

const { from } = rxjs;
const { map, tap, filter, reduce, scan } = rxjs.operators;
from([1,2,3,4,5]).pipe(
map(a => a * 2),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
map(a => a * a),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
map(a => a * a),
reduce((accum, item) => accum + item, 0),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
map(a => a * a),
scan((accum, item) => accum + item, 0),
).subscribe(console.log);

Error Handling

const { interval, of } = rxjs;
const { catchError, map } = rxjs.operators;
const source = interval(1000).pipe(
map(i => i > 2 ? throw new Error('Yaba Daba Duu') : i),
catchError(e => of('Stream is broken'))
);
source.subscribe(console.log)

Example Application

const { from } = rxjs;
const { ajax } = rxjs.ajax;
const { flatMap, take, tap, map } = rxjs.operators;
const getJson$ = url => ajax(url).pipe(
map(a => a.response),
);
const url = 'https://jsonplaceholder.typicode.com/posts';
const getPosts$ = id => getJson$(`${url}?userId=${id}`).pipe(
flatMap(a => from(a)),
take(1),
);
getJson$(‘https://jsonplaceholder.typicode.com/users').pipe(
flatMap(a => from(a)),
flatMap(({ id }) => getPosts$(id)),
).subscribe(console.log);

Notes

  1. All example codes can be run in https://codepen.io/
  2. RxJS should be added as external script in code pen

Originally published at https://medium.com on April 2, 2020.

--

--