Image for post
Image for post
Rxjs

What is Reactive Programming ?

What is Observable ?

MeaningLess Example

Imperative

let clicks = 0;
const clickHandler = e => {
const { innerWidth: width } = window;
const { clientX: xpos } = e;
if(xpos > width/2) {
if(clicks < 10) {
clicks++;
console.log(‘count=’, clicks, ‘xpos=’, xpos);
return;
}
document.removeEventListener(‘click’, clickHandler);
}
};
document.addEventListener(‘click’, clickHandler);

Reactive way

const { fromEvent } = rxjs;
const { filter, take } = rxjs.operators;
fromEvent(document, 'click').pipe(
filter(e => e.clientX > window.innerWidth/2),
take(10),
).subscribe(e => console.log('xpos=', e.clientX));

Observer Pattern

class Subject {
constructor() {
this.listeners = new Set();
}

add(listener) {
this.listeners.add(listener);
}

remove(listener) {
this.listeners.delete(listener);
}

notify(obj) {
this.listeners.forEach(item => {
item.update(obj);
})
}
}

class Event {
constructor(name, data) {
this.name = name;
this.data = data;
}

getName() {
return this.name;
}

getData() {
return this.data;
}
}
const listener1 = {
update(evt) {
console.log(`listener1 ${evt.getName()}`);
}
}
const listener2 = {
update(evt) {
console.log(`listener2 ${evt.getName()}`);
}
}

const testSubject = new Subject();
testSubject.add(listener1);
testSubject.add(listener2);
testSubject.notify(new Event('FirstEvent', 1));

Iterator Pattern

class Range {
constructor(start, end) {
this.start = start;
this.end = end;
}

hasNext() {
return this.start < this.end;
}

next() {
if(this.hasNext()) {
return this.start++;
}
}
}
const obj = new Range(0, 3);
console.log(obj.next(), obj.hasNext());
console.log(obj.next(), obj.hasNext());
console.log(obj.next(), obj.hasNext());
console.log(obj.next(), obj.hasNext());
//JS way
const range = (start, end) => ({
[Symbol.iterator]() {
return this;
},
next() {
if(start < end) {
return { value: start++, done: false };
}
return { value: undefined, done: true} ;
}
});
const obj = range(0, 3);
for(let item of obj) {
console.log(‘item=’, item);
}
console.log(Array.from(range(0, 3)), […range(0, 3)]);
console.log(obj.next());
console.log(obj.next());
const arr = [1,2,3].values();
console.log(arr.next().value);

Observable emits its values in order -like an iterator but instead of its consumer requesting next value, the observable pushes the values to its consumers as they become available.It plays the similar role like producer in observer pattern: emitting values and pushing it to its listeners.

Push vs Pull

Rxjs is push based, source of events (Observable) will push new values to the consumer, without consumer requesting it.

Observable has two distinct features

  1. It don’t start emitting values until it has an observer.
  2. It can signal when the sequence aka stream completes.

Using Observable.create

const { Observable } = rxjs;
const { map } = rxjs.operators;
const obj = Observable.create(observer => {
observer.next(‘A’);
observer.next(‘B’);
// observer.error(new Error(‘I am Error’));
observer.complete();
observer.next(‘D’);
});
obj.subscribe({
next: console.log,
error(e){
console.error(e.message);
},
complete() {
console.log(‘completed’);
}
});
obj.pipe(
map(input => input.toLowerCase()),
).subscribe({
next: console.log,
});

Ajax request using Observable.create

const { Observable } = rxjs;
const getJson = url => Observable.create(observer => {
const http = new XMLHttpRequest();
http.onload = () => {
if(http.status === 200) {
observer.next(http.response);
observer.complete();
} else {
observer.error(new Error(http.statusText));
}
}
http.onerror = (e) => {
subscriber.error(new Error(‘Unknown Error’));
}
http.open(‘GET’, url);
http.send();
});
getJson(‘https://jsonplaceholder.typicode.com/users')
.subscribe(console.log);

Creating Observable from Array, Promise & Iterable

//Example using array
const { from } = rxjs;
from(['vue', 'react', 'angular', 'cyclejs']).subscribe(console.log);
//Example using Promise
const createPromise = () => new Promise((resolve, reject) => {
setTimeout(resolve, 1000, 'Hello Promise');
});
from(createPromise()).subscribe(console.log);
// Example using Iterator
const range = (start, end) => ({
[Symbol.iterator]() {
return this;
},
next() {
if(start < end) {
return { value: start++, done: false };
}
return { value: undefined, done: true} ;
}
});
from(range(0, 3)).subscribe(console.log);

Creating Observable from callback functions

// example using jquery
const { bindCallback } = rxjs;
const getJson$ = bindCallback($.getJSON);
getJson$('https://jsonplaceholder.typicode.com/users')
.subscribe(console.log);
// random example
const testCall = (input, cb) => {
setTimeout(() => {
const random = Math.floor(Math.random() * 100);
if(random > 50) {
cb(new Error('Error Occured'));
return;
}
cb(random);
}, 300);
}
const cb$ = bindCallback(testCall);
cb$(123).subscribe(console.log, console.error);

Basic Sequence Operators

const { from } = rxjs;
const { map, tap, filter, reduce, scan } = rxjs.operators;
from([1,2,3,4,5]).pipe(
map(a => a * 2),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
map(a => a * a),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
map(a => a * a),
reduce((accum, item) => accum + item, 0),
).subscribe(console.log);
from([1,2,3,4,5]).pipe(
filter(a => a %2 === 0),
map(a => a * a),
scan((accum, item) => accum + item, 0),
).subscribe(console.log);

Error Handling

const { interval, of } = rxjs;
const { catchError, map } = rxjs.operators;
const source = interval(1000).pipe(
map(i => i > 2 ? throw new Error('Yaba Daba Duu') : i),
catchError(e => of('Stream is broken'))
);
source.subscribe(console.log)

Example Application

const { from } = rxjs;
const { ajax } = rxjs.ajax;
const { flatMap, take, tap, map } = rxjs.operators;
const getJson$ = url => ajax(url).pipe(
map(a => a.response),
);
const url = 'https://jsonplaceholder.typicode.com/posts';
const getPosts$ = id => getJson$(`${url}?userId=${id}`).pipe(
flatMap(a => from(a)),
take(1),
);
getJson$(‘https://jsonplaceholder.typicode.com/users').pipe(
flatMap(a => from(a)),
flatMap(({ id }) => getPosts$(id)),
).subscribe(console.log);

Notes

  1. All example codes can be run in https://codepen.io/
  2. RxJS should be added as external script in code pen

Originally published at https://medium.com on April 2, 2020.

Written by

Learner, Programmer, Experienced

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store