Unit 11 Reactive Programming: RxJS library 簡介
什麼是 Reactive Programming
Reactive Programming (回應式程式設計) , 是以非同步資料串流(asynchronous data stream)為形式的程式設計思維. 觀察者(Observer)透過訂閱可觀察的(Observable)物件, 成為該物件的訂閱者(Subscriber). 可觀察的物件產生的資料會主動推向訂閱者, 訂閱者便可以即時的反應資料的變化. Ref:Ref: Reactive programming - Wikipedia

Figure Source: Reactive Programming in JavaScript with RxJS.
考慮底下的例子:
1
2
3
4
let a1 = 2;
let a2 = 4;
let b = a1 + a2; // b = 6
let a1 = 3  // b = ?
b 的值, 在命令式編程(imperative programming)的思維下, 為 6. 命令式編程指示函數何時啟用, 在 b = a1 + a2 時, 將運算的結果指派給 b.
當 a1 的值改變為 3 時, b 的值會是多少呢? 在命令式編程下, b 的值並不會改變, 因為 a1 的值改變後, 我們並沒有再次呼叫 b = a1 + a2 重新計算 b 的值.
回應式程式設計思維用不同的角度看 b = a1 + a2:
a1 + a2可視為一個可觀察的物件, 當有異動時, 運算的結果會自動送出(emit).b是一個觀察者, 對a1 + a2運算結果有興趣; 對b來說a1 + a2是一個可觀察的物件。=是訂閱動作, 將觀察者和可觀察物件繫結在一起, 讓觀查者變成訂閱者.- 所以, 當 
a1的值變成為 3 時,b的值自動變成7, 我們不用再次呼叫b = a1 + a2進行計算. 
Reactive Programming 可以讓應用程式具備非同步處理的能力, 或者事件導向式處理能力, 事件串流被主動推送到訂閱者處理事件.
應用舉例:
- GUI 的 MVC 模型, 當 Model 的資料有異動時, View 會自動的更新.
 - 另一個最常見的例子是對後端提出 http request。Http Response 的等待時間受到眾多因素影響。
 - 在 Reactive Programming 的思維下, 提出 Http Request 之後, 我們可以訂閱 Response 的結果。
    
- Response 回來後, Response 的內容會直接推送給訂閱者。
 - 在等待資料推送的過程中, 程式執行緒並不會被卡住, 可以執行其它程式。換句話說, 程式不會因網路速度慢而造成 UI 的凍結。
 
 
RxJS 術語(terminology)與舉例
RxJS 重要術語:
- Observable: 隨時間推送資料的資料串流
 - Observer: 資料串流的消費者(consumer), 對取得的資料做處理, 例如印出資料
 - Subscription: 表示 Observable 與 Observer 間的訂閱關係。
 - Operator: 串流資料轉換的函數
 
底下例子, 每隔 1 秒鐘印出計數的數字 0, 1, 2, 3…, 到第 5 秒的時候停止。
Reactive Programming 的思維下,
- Observable: 有個物件能夠每隔一秒產生一個數字
 - Observer: 對 Observable 進行訂閱, 印出得到的數字
 - Subscription: 描述 Observable 及 Observable 間的關係, 提供 unsubscribe() 取消訂閱, 結束關係。
 
Demo u11-ex1 @ StackBlitz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { interval, Observer, Subscription } from "rxjs";
// Observable 可觀察串流
const source$ = interval(1000);
// Observer 觀察者. 
// <T> 指定可處理的資料型態
let myObserver: Observer<number> = {
  // 取得下個數字後的處理方法
  next: value => console.log(value),
  // 發生錯誤時的處理方法
  error: err => console.log(err),
  // 串流推送完畢的處理方法
  complete: ()=> console.log('')
};
// 建立訂閱關係
const subscription: Subscription = source$.subscribe(myObserver);
// 5 秒後, 結束訂閱關係
setTimeout(() => {
  console.log("Unsubscribe");
  subscription.unsubscribe();
}, 5000);
Observable 物件提供 subscribe(), 可直接傳入 Observer, 告訴 Observable 有那些 Observer 向其訂閱, 該方法的簽名如下:
1
subscribe(observer?: PartialObserver<T>): Subscription
Src: Observable.subscribe() @ RxJS
此方法的另一個多載可以直接傳入 next, error, complete三方法的個函數:
1
subscribe(next: null, error: null, complete: () => void): Subscription
Src: Observable.subscribe() @ RxJS
RxJS Play Ground
StackBlitz 提供線上撰寫 RxJS 的編輯器

建立 Observer 及其處理串流的方法
Observer (觀察者) 是具有三個回呼方法(callbacks)的介面(interface),用來監聽 Observable 送出的資料。
向 Observable 訂閱資料的人必須實作此介面, 提供 handler 方法。
界面中的方法包括:
next: 要求取得下個值。Observable 將串流中的下一個元素推向 Observer; Observer 提供方法進行處理。error:Observable將錯誤訊息推向Observer,Observer提供方法進行處理complete:Observable通知Observer串流資料全部輸出完成, Observer 提供方法進行處理。
以下是 Observer 的介面定義:
1
2
3
4
5
6
interface Observer<T> {
  closed?: boolean
  next: (value: T) => void
  error: (err: any) => void
  complete: () => void
}
參考: Observer Interface @ rxjs-dev
再看另外一個例子。這個例子將陣列資料轉成非同步的資料 Observable<string>。此外, 在訂閱時, 可以傳入具有名稱的 Observer: myObserver 或者不具名的 Observer 物件。
Demo code u11-ex2 @ StackBlitz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { from, Observer, of, Subscription, Observable } from "rxjs";
import { map } from "rxjs/operators";
const disneyFriends = ['米老鼠', '高飛', '唐老鴨', '布魯托'];
// 建立 Observer
let myObserver: Observer<string> = {
  next: (friend) => console.log(friend),
  error: err => console.log(err),
  complete: () => console.log('所有的朋友都來了!!')
};
// 使用 from 運算子將陣例轉成 Observable 資料
// 向 Observable 進行訂閱, 訂閱時傳入有名稱的 Observer
from(disneyFriends).subscribe(myObserver);
// 使用不具名的 Observer 物件
from(disneyFriends).subscribe( {
  next: friend => console.log(friend, ' bye bye'),
  error: err => console.log(err),
  complete: () => console.log('所有朋友都離開了!!')
});
建立 Observable
有數種方式可以將一般的資料建立成 Observable Stream Data。
of() RxJS function
使用 of() 將一連串的數字轉成串流:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ux11-ex3
import { interval, Observer, of, Subscription } from "rxjs";
import { map } from "rxjs/operators";
// 建立 Observable Stream
const source$ = of(1, 2, 3, 5, 9, 10, 11);
let myObserver: Observer<number> = {
  next: value => console.log(value),
  error: err => console.log(err),
  complete: ()=> console.log('串流推送完成')
};
const subscription: Subscription = source$.subscribe(myObserver);
from() RxJS function
使用 from(myArray) 將陣列或者 collection 變成串流:
1
2
3
4
5
6
7
8
9
10
11
12
13
import { from, Observer, Subscription } from "rxjs";
const myArray = ['A', 'B', 'C', 'D', 'E'];
// const source = of(1, 2, 3, 5, 9, 10, 11);
 const source = from(myArray);
let myObserver: Observer<string> = {
  next: value => console.log(value),
  error: err => console.log(err),
  complete: ()=> console.log('串流推送完成')
};
const subscription: Subscription = source.subscribe(myObserver);

interval() RxJS function
固定間隔一段時間產生 Creates an Observable that emits sequential numbers every specified interval of time, on a specified SchedulerLike.

底下的例子取出前 5 個數字後即停止:
1
2
3
4
5
6
7
8
9
import { from, Observer, of, Subscription, Observable, interval } from "rxjs";
import { take } from "rxjs/operators";
//#1
const take5Elements = interval(500).pipe(take(5));
//#2
take5Elements.subscribe( x => console.log(x));
- take pipe 運算子 讓我們指定要取幾個元素
 - 我們使用 subscribe() 的另一種簽名, 可以直接傳入 
next,error,complete三方法的個函數: 
1
subscribe(next: null, error: null, complete: () => void): Subscription
Src: Observable.subscribe() @ RxJS
輸出結果:

new Observable() 方法
使用 Observable 的建構子建立 Observable 物件。
建立時, 要傳具 Observer 介面的物件, 我們將之稱為 subscriber。
此方法會呼叫傳入的 subscriber 的 next(), error(), 及 complete() 方法。
Demo Code u11-ex3 @ StackBlitz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { from, Observer, of, Subscription, Observable } from "rxjs";
import { map } from "rxjs/operators";
// const subscription: Subscription = source.subscribe(myObserver);
// 建立
const myObservable = new Observable( function subscribe(subscriber){
  // subscriber is a observer
  const myArray = ['A1', 'B1', 'C1', 'D1', 'E1'];
  myArray.forEach( (element) => {
    // 呼叫 observable.next() 處理陣列中的元素
    subscriber.next(element);
  })
  // 完成後, 呼叫 observable.complete()
  subscriber.complete();
});
// 建立 Observer 物件提供 next, error, complete 等事件發生時的處理
let myObserver: Observer<number> = {
  next: value => console.log(value),
  error: err => console.log(err),
  complete: ()=> console.log('串流推送完成')
};
myObservable.subscribe(myObserver);
輸出結果

Pipeable Operators (可串接運算子)
Reactive Programming 中的可串接運算子為函數, 將輸入依據規則轉換成輸出。
可串接運算子是一個函數, 接受一個 Observable 作輸入, 之後輸出另一個Observable, 亦即將一個 Observable 轉換成另一個。 這是一個單純操作(pure operation):來源的 Observable 保持不變。

Ref: RxJS Operators Explained with Example (2020) @ TechGeekNext »
上述圖形的 RxJS 程式碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Observer, of } from 'rxjs'; 
import { filter, map } from 'rxjs/operators';
// Convert one observable to another observable
const source$ = of(1,2,3).pipe(
  map( x => x*2),
  filter( x => x <5)
);
// subscribe the observable
source$.subscribe(
  (x) => console.log(x),
  (error) => console.log(error),
  ()=>console.log('Complete')
);

Pipeable Operator 的類型
使用 pipe() 為 Observable 建立元素轉換的管道。
在 pipe() 的輸入參數內可放入多個 Pipeable Operator, 依序轉換 Observable 送出的元素。
例如:
1
2
3
4
const source$ = of(1,2,3).pipe(
  map( x => x*2),
  filter( x => x <5)
);
Pipeable Operator 的類型
Pipeable operator 的類型為: creation (建立), transformation(轉換), filtering(過濾), joining(串接), multicasting(群播), error handling(錯誤處理), utility(工具), etc.
完整的類型請參考: Categories of operators 小節 @ RxJS
marble diagram (彈珠圖)
marble diagram (彈珠圖)常用來圖示 Pipeable operator 的元素操作。 RxMarbles: Interactive diagrams of Rx Observables 提供各類 operators 的 marble diagram (彈珠圖)。

Marble Diagram 的閱讀方式:
- Input Observable: 輸入的 Observable (內容不受 Operator 的影響, 因為是 pure operation)
 - Operator: 對 Observable 元素作用的運算子
 - Output Observable: 運算後輸出的 Observable
 
FILTERING OPERATORS (過濾運算子)
Filter Operator

filter 的簽名為:
1
filter<T>(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction<T>
傳入一個 predicate 方法判斷元素為 true or false. 該方法接受二個參數的輸入:
- value: T - 元素值, 型態為 T
 - index: number - 表示為第幾個元素
 
運算子執行完成為得到一個 MonoTypeOperatorFunction<T> 的 Observable.
Example for the Filter operator @ StackBlitz.com
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { interval, Observable,  Observer, of, timer } from 'rxjs'; 
import { filter, map, take } from 'rxjs/operators';
// Create an observable
// 每隔 1 秒產生一個介於 0 到 99 間的亂數
const source$ = new Observable( function subscribe(subscriber){
  setInterval( ()=>{
    const rn: number = Math.round(Math.random() * 100);
    subscriber.next(rn);
  } , 1000)
});
const myObserver: Observer<number> = {
  next: (value) => console.log(value, 'index: '),
  error: err => console.log(err),
  complete: () => console.log('Complete!!')
}
const sourceFiltered$ = source$.pipe(
  filter( (value, idx) => value > 10 && idx < 5),
);
sourceFiltered$.subscribe(myObserver);

Take Operator
1
take<T>(count: number): MonoTypeOperatorFunction<T>
取得前面數個元素, 例如前 5 個。

Example of the Take Operator
1
2
3
4
5
6
7
8
9
10
11
12
13
import { of, interval, Observer } from 'rxjs'; 
import { map, take, filter, reduce } from 'rxjs/operators';
const source$ = of(1,2,3,4).pipe(take(2));
const myObserver: Observer<number> = {
  next: value => console.log(value),
  error: err => console.log(err),
  complete: () => console.log('Complete!!')
}
source$.subscribe(myObserver);
Mathematical and Aggregate Operators (數學與聚合運算字)
reduce() 運算子對元素進行聚合運算, 如 sum, 把多個元素值, 聚合成單一個值。

此運算子可傳入以下的規格的 accumulator 方法:
1
accumulator: (acc: T | R, value: T, index?: number) => T | R
方法的輸入參數:
acc: T- 聚合值, 第 1 個參數, 資料型態為 Observable 中原本的資料型態T或者聚合值轉換成另一種資料型態R.value: T- Observable 的發送值, 第 2 個參數, 資料型態為 Observable 中原本的資料型態T。index?: number: 目前發送值的序號(即, 第幾個發送值)
方法的輸出資料型態:
T | R: Observable 中原本的資料型態 T 或者聚合值轉換成另一種資料型態 R.
Example for the Reduced operator 計算前 5 個發送值的總合:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { of, interval, Observer } from 'rxjs'; 
import { map, take, filter, reduce } from 'rxjs/operators';
const source$ = interval(500).pipe(
  filter (x => x > 0), 
  take(5),
  reduce( (acc, x) => acc + x)
  );
const myObserver: Observer<number> = {
  next: value => console.log(value),
  error: err => console.log(err),
  complete: () => console.log('Complete!!')
}
source$.subscribe(myObserver);
TRANSFORMATION OPERATORS (轉換運算子)
map() 用來將 Observable 中的元素投射(project)到另一個值域。

map() 可傳入以下簽名的 project 函數:
1
project: (value: T, index: number) => R
輸入參數:
value: T- Observable 的發送值, 第 1 個參數, 資料型態為 Observable 中原本的資料型態T。index: number: 目前發送值的序號(即, 第幾個發送值)
輸出資料型態:
R: 投射後資料的資料型態 R
Example for the Map operator 取前 3 個 大於 10 的值, 將其值放大 10 倍:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { of, interval, Observer } from 'rxjs'; 
import { map, take, filter } from 'rxjs/operators';
const source$ = interval(500).pipe(
  filter (x => x > 10), 
  take(3), 
  map(x => 10 * x ));
const myObserver: Observer<number> = {
  next: value => console.log(value),
  error: err => console.log(err),
  complete: () => console.log('Complete!!')
}
source$.subscribe(myObserver);
Example: 將 number 投射到 string 資料型態
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { of, interval, Observer, from } from 'rxjs'; 
import { map, take, filter } from 'rxjs/operators';
const grades = [100, 90, 80, 70, 60, 50, 60, 70];
// 訂義 coding 投射函數
let coding  = function(x: number): string {
  if ( x > 90 ) return 'A'
   else if (x > 80 ) return 'B'
   else if (x > 70) return 'C'
   else if (x > 60) return 'D'
   else return 'F';
};
// 將 coding 投射函數傳入到 map() 中
from(grades).pipe(map(coding)).subscribe(console.log);
上述的程式可以改採用匿名函數的寫法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { of, interval, Observer, from } from 'rxjs'; 
import { map, take, filter } from 'rxjs/operators';
const grades = [100, 90, 80, 70, 60, 50, 60, 70];
// 傳入一個匿名投射函數給 map()
from(grades).pipe(
  //#1
  map<number, string>((x) => {if ( x > 90 ) return 'A'
   else if (x > 80 ) return 'B'
   else if (x > 70) return 'C'
   else if (x > 60) return 'D'
   else return 'F';}
  )
  ).subscribe(console.log);
在 map()指定轉換前、後的資料型態, 使 TypeScript 能正確推論資料型態。
前述所推論出的 pipe() 的輸入與輸出資料型態分別為 number 及 string。

若不指定 <number, string>, 則輸入與輸出資料型態分別為:
- 輸入: 
number - 輸出: 
"A" | "B" | "C" | "D" | "F" 

總結
- 需要非同步操作時, 使用 
Observable。例如與後端做 HTTP 通訊, 或者監聽 UI 事件。 Observable會主動推播資料到訂閱該Observable的觀察者(Observer), 觀察者便可以即時的反應資料的變化。- 訂閱 
Observable時, 觀查者要定義 3 個方法: next, error, 及 complete。 - 可使用 
of(),from(),interval(),new Observable()等方法, 建立自己的 Observable. Observable的資料在送達 Observer 之前可以進行處理。利用pipe()可各式各樣的 pipeable operators 可對資料做: creation (建立), transformation(轉換), filtering(過濾), joining(串接), multicasting(群播), error handling(錯誤處理) 等動作。- typescript 中的函數型態(function type)的語法為:
    
1
function_name:([[parameter: type],...]) => return_type
 
例如, filter() 的可接受的函數型態為:
1
predicate: (value: T, index: number) => boolean
Ref: Writing the function type, Handbook - Functions @ www.typescriptlang.org