很明顯你是有興趣學習這種被稱作響應式編程的新技術(shù)才來看這篇文章的。
學習響應式編程是很困難的一個過程,特別是在缺乏優(yōu)秀資料的前提下。剛開始學習時,我試過去找一些教程,并找到了為數(shù)不多的實用教程,但是它們都流于表面,從沒有圍繞響應式編程構(gòu)建起一個完整的知識體系。庫的文檔往往也無法幫助你去了解它的函數(shù)。不信的話可以看一下這個:
通過合并元素的指針,將每一個可觀察的元素序列放射到一個新的可觀察的序列中,然后將多個可觀察的序列中的一個轉(zhuǎn)換成一個只從最近的可觀察序列中產(chǎn)生值得可觀察的序列。
天啊。
我看過兩本書,一本只是講述了一些概念,而另一本則糾結(jié)于如何使用響應式編程庫。我最終放棄了這種痛苦的學習方式,決定在開發(fā)中一邊使用響應式編程,一邊理解它。在 Futurice 工作期間,我嘗試在真實項目中使用響應式編程,并且當我遇到困難時,得到了同事們的幫助。
在學習過程中最困難的一部分是 以響應式編程的方式思考 。這意味著要放棄命令式且?guī)顟B(tài)的編程習慣,并且要強迫你的大腦以一種不同的方式去工作。在互聯(lián)網(wǎng)上我找不到任何關(guān)于這方面的教程,而我覺得這世界需要一份關(guān)于怎么以響應式編程的方式思考的實用教程,這樣你就有足夠的資料去起步。庫的文檔無法為你的學習提供指引,而我希望這篇文章可以。
在互聯(lián)網(wǎng)上有著一大堆糟糕的解釋與定義。Wikipedia 一如既往的空泛與理論化。Stackoverflow 的權(quán)威答案明顯不適合初學者。Reactive Manifesto 看起來是你展示給你公司的項目經(jīng)理或者老板們看的東西。微軟的 Rx terminology "Rx = Observables + LINQ + Schedulers" 過于重量級且微軟味十足,只會讓大部分人困惑。相對于你所使用的 MV* 框架以及鐘愛的編程語言,"Reactive" 和 "Propagation of change" 這些術(shù)語并沒有傳達任何有意義的概念。框架的 Views 層當然要對 Models 層作出反應,改變當然會傳播。如果沒有這些,就沒有東西會被渲染了。
所以不要再扯這些廢話了。
一方面,這并不是什么新東西。Event buses 或者 Click events 本質(zhì)上就是異步事件流,你可以監(jiān)聽并處理這些事件。響應式編程的思路大概如下:你可以用包括 Click 和 Hover 事件在內(nèi)的任何東西創(chuàng)建 Data stream。Stream 廉價且常見,任何東西都可以是一個 Stream:變量、用戶輸入、屬性、Cache、數(shù)據(jù)結(jié)構(gòu)等等。舉個例子,想像一下你的 Twitter feed 就像是 Click events 那樣的 Data stream,你可以監(jiān)聽它并相應的作出響應。
在這個基礎上,你還有令人驚艷的函數(shù)去組合、創(chuàng)建、過濾這些 Streams。這就是函數(shù)式魔法的用武之地。Stream 能接受一個,甚至多個 Stream 為輸入。你可以融合兩個 Stream,也可以從一個 Stream 中過濾出你感興趣的 Events 以生成一個新的 Stream,還可以把一個 Stream 中的數(shù)據(jù)值 映射到一個新的 Stream 中。
既然 Stream 在響應式編程中如此重要,那么我們就應該好好的了解它們,就從我們熟悉的"Clicks on a button" Event stream 開始。
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/81.png" alt="響應式編程" />
Stream 就是一個按時間排序的 Events 序列,它可以放射三種不同的 Events:(某種類型的)Value、Error 或者一個" Completed" Signal。考慮一下"Completed"發(fā)生的時機,例如,當包含這個按鈕的窗口或者視圖被關(guān)閉時。
通過分別為 Value、Error、"Completed"定義事件處理函數(shù),我們將會異步地捕獲這些 Events。有時可以忽略 Error 與"Completed",你只需要定義 Value 的事件處理函數(shù)就行。監(jiān)聽一個 Stream 也被稱作是訂閱 ,而我們所定義的函數(shù)就是觀察者,Stream則是被觀察者,其實就是 Observer Design Pattern。
上面的示意圖也可以使用ASCII重畫為下圖,在下面的部分教程中我們會使用這幅圖:
--a---b-c---d---X---|->
a, b, c, d are emitted values
X is an error
| is the 'completed' signal
---> is the timeline
既然已經(jīng)開始對響應式編程感到熟悉,為了不讓你覺得無聊,我們可以嘗試做一些新東西:我們將會把一個 Click event stream 轉(zhuǎn)為新的 Click event stream。
首先,讓我們做一個能記錄一個按鈕點擊了多少次的計數(shù)器 Stream。在常見的響應式編程庫中,每個Stream都會有多個方法,如 map
, filter
, scan
, 等等。當你調(diào)用其中一個方法時,例如 clickStream.map(f)
,它就會基于原來的 Click stream 返回一個新的 Stream 。它不會對原來的 Click steam 作任何修改。這個特性稱為不可變性,它對于響應式編程 Stream,就如果汁對于薄煎餅。我們也可以對方法進行鏈式調(diào)用,如 clickStream.map(f).scan(g)
:
clickStream: ---c----c--c----c------c-->
vvvvv map(c becomes 1) vvvv
---1----1--1----1------1-->
vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->
map(f)
會根據(jù)你提供的 f
函數(shù)把原 Stream 中的 Value 分別映射到新的 Stream 中。在我們的例子中,我們把每一次 Click 都映射為數(shù)字 1。scan(g)
會根據(jù)你提供的 g
函數(shù)把 Stream 中的所有 Value 聚合成一個 Value x = g(accumulated, current)
,這個示例中 g
只是一個簡單的添加函數(shù)。然后,每 Click 一次, counterStream
就會把點擊的總次數(shù)發(fā)給它的觀察者。
為了展示響應式編程真正的實力,讓我們假設你想得到一個包含“雙擊”事件的 Stream。為了讓它更加有趣,假設我們想要的這個 Stream 要同時考慮三擊(Triple clicks),或者更加寬泛,連擊(兩次或更多)。深呼吸一下,然后想像一下在傳統(tǒng)的命令式且?guī)顟B(tài)的方式中你會怎么實現(xiàn)。我敢打賭代碼會像一堆亂麻,并且會使用一些變量保存狀態(tài),同時也有一些計算時間間隔的代碼。
而在響應式編程中,這個功能的實現(xiàn)就非常簡單。事實上,這邏輯只有 4 行代碼。但現(xiàn)在我們先不管那些代碼。用圖表的方式思考是理解怎樣構(gòu)建Stream的最好方法,無論你是初學者還是專家。
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/82.png" alt="響應式編程" />
灰色的方框是用來轉(zhuǎn)換 Stream 函數(shù)的。首先,簡而言之,我們把連續(xù) 250 ms 內(nèi)的 Click 都積累到一個列表中(就是 buffer(stream.throttle(250ms)
做的事。不要在意這些細節(jié),我們只是展示一下響應式編程而已)。結(jié)果是一個列表的 Stream ,然后我們使用 map()
把每個列表映射為一個整數(shù),即它的長度。最終,我們使用 filter(x >= 2)
把整數(shù) 1 給過濾掉。就這樣,3 個操作就生成了我們想要的 Stream。然后我們就可以訂閱(“監(jiān)聽”)這個 Stream,并以我們所希望的方式作出反應。
我希望你能感受到這個示例的優(yōu)美之處。這個示例只是冰山一角:你可以把同樣的操作應用到不同種類的 Stream 上,例如,一個 API 響應的 Stream;另一方面,還有很多其它可用的函數(shù)。
響應式編程提高了代碼的抽象層級,所以你可以只關(guān)注定義了業(yè)務邏輯的那些相互依賴的事件,而非糾纏于大量的實現(xiàn)細節(jié)。RP 的代碼往往會更加簡明。
特別是在開發(fā)現(xiàn)在這些有著大量與數(shù)據(jù)事件相關(guān)的 UI events 的高互動性 Webapps、手機 apps 的時候,RP 的優(yōu)勢就更加明顯。10年前,網(wǎng)頁的交互就只是提交一個很長的表單到后端,而在前端只產(chǎn)生簡單的渲染。Apps 就表現(xiàn)得更加的實時了:修改一個表單域就能自動地把修改后的值保存到后端,為一些內(nèi)容"點贊"時,會實時的反應到其它在線用戶那里等等。
現(xiàn)在的 Apps 有著大量各種各樣的實時 Events,以給用戶提供一個交互性較高的體驗。我們需要工具去應對這個變化,而響應式編程就是一個答案。
讓我們做一些實踐。一個真實的例子一步一步的指導我們以 RP 的方式思考。不是虛構(gòu)的例子,也沒有只解釋了一半的概念。學完教程之后,我們將寫出真實可用的代碼,并做到知其然,知其所以然。
在這個教程中,我將會使用 JavaScript 和 RxJS 作為工具 ,因為JavaScript是現(xiàn)在最多人會的語言,而 Rx* library family 有多種語言版本,并支持多種平臺(.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等)。所以,無論你用的是什么工具,你都能從下面這個教程中受益。
在 Twitter 上,這個表明其他賬戶的 UI 元素看起來是這樣的:
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/83.png" alt="響應式編程" />
我們將會重點模擬它的核心功能,如下:
我們可以忽略其它的特性和按鈕,因為它們是次要的。同時,因為 Twitter 最近關(guān)閉了對非授權(quán)用戶的 API,我們將會為 Github 實現(xiàn)這個推薦界面,而非 Twitter。這是Github獲取用戶的API。
如果你想先看一下最終效果,這里有完成后的代碼 http://jsfiddle.net/staltz/8jFJH/48/。
在 Rx 中你該怎么處理這個問題呢? 好吧,首先,(幾乎) 所有的東西都可以轉(zhuǎn)為一個Stream 。這就是Rx的咒語。讓我們先從最簡單的特性開始:"在啟動時,從API加載3個帳戶的數(shù)據(jù)"。這并沒有什么特別,就只是簡單的(1)發(fā)出一個請求,(2)收到一個響應,(3)渲染這個響應。所以,讓我們繼續(xù),并用Stream代表我們的請求。一開始可能會覺得殺雞用牛刀,但我們應當從最基本的開始,對吧?
在啟動的時候,我們只需要發(fā)出一個請求,所以如果我們把它轉(zhuǎn)為一個Data stream的話,那就是一個只有一個Value的Stream。稍后,我們知道將會有多個請求發(fā)生,但現(xiàn)在,就只有一個請求。
--a------|->
Where a is the string 'https://api.github.com/users'
這是一個我們想向其發(fā)出請求的 URL 的 Stream。每當一個請求事件發(fā)生時,它會告訴我們兩件事:"什么時候"與"什么東西"。"什么時候"這個請求會被執(zhí)行,就是什么時候這個 Event 會被映射。"什么東西"會被請求,就是這個映射出來的值:一個包含 URL 的 String。
在 RX 中,創(chuàng)建只有一個值的 Stream 是非常簡單的。官方把一個 Stream 稱作“Observable”,因為它可以被觀察,但是我發(fā)現(xiàn)那是個很愚蠢的名子,所以我把它叫做 Stream*。
var requestStream = Rx.Observable.just('https://api.github.com/users');
但是現(xiàn)在,那只是一個包含了String的Stream,并沒有其他操作,所以我們需要以某種方式使那個值被映射。就是通過 subscribing 這個 Stream。
requestStream.subscribe(function(requestUrl) {
// execute the request
jQuery.getJSON(requestUrl, function(responseData) {
// ...
});
}
留意一下我們使用了 jQuery 的 Ajax 函數(shù)(我們假設你已經(jīng)知道 should know already)去處理異步請求操作。但先等等,Rx 可以用來處理異步 Data stream。那這個請求的響應就不能當作一個包含了將會到達的數(shù)據(jù)的 Stream 嗎?當然,從理論上來講,應該是可以的,所以我們嘗試一下。
requestStream.subscribe(function(requestUrl) {
// execute the request
var responseStream = Rx.Observable.create(function (observer) {
jQuery.getJSON(requestUrl)
.done(function(response) { observer.onNext(response); })
.fail(function(jqXHR, status, error) { observer.onError(error); })
.always(function() { observer.onCompleted(); });
});
responseStream.subscribe(function(response) {
// do something with the response
});
}
Rx.Observable.create()
所做的事就是通過顯式的通知每一個 Observer (或者說是“Subscriber”) Data events( onNext()
)或者 Errors ( onError()
)來創(chuàng)建你自己的 Stream。而我們所做的就只是把 jQuery Ajax Promise 包裝起來而已。打擾一下,這意味者Promise本質(zhì)上就是一個Observable?
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/84.gif" alt="響應式編程" />
是的。
Observable 就是 Promise++。在 Rx 中,你可以用 var stream = Rx.Observable.fromPromise(promise)
輕易的把一個 Promise 轉(zhuǎn)為 Observable,所以我們就這樣子做吧。唯一的不同就是 Observable 并不遵循 Promises/A+,但概念上沒有沖突。Promise 就是只有一個映射值的 Observable。Rx Stream 比 Promise 更進一步的是允許返回多個值。
這樣非常不錯,并展現(xiàn)了 Observables 至少有 Promise 那么強大。所以如果你相信 Promise 宣傳的那些東西,那么也請留意一下 Rx Observables 能勝任些什么。
現(xiàn)在回到我們的例子,如果你已經(jīng)注意到了我們在 subscribe()
內(nèi)又調(diào)用了另外一個 subscribe()
,這類似于 Callback hell。同樣,你應該也注意到 responseStream
是建立在 requestStream
之上的。就像你之前了解到的那樣,在 Rx 內(nèi)有簡單的機制可以從其它 Stream 中轉(zhuǎn)換并創(chuàng)建出新的 Stream,所以我們也應該這樣子做。
你現(xiàn)在需要知道的一個基本的函數(shù)是 [map(f)](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypemapselector-thisarg)
,它分別把 f()
應用到 Stream A 中的每一個值中,并把返回的值放進 Stream B 里。如果我們也對請求 Stream 與響應 Stream 進行同樣的處理,我們可以把 Request URL 映射為響應 Promise(而 Promise 可以轉(zhuǎn)為 Streams)。
var responseMetastream = requestStream
.map(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
然后,我們將會創(chuàng)造一個叫做" Metastream "的怪物:包含 Stream 的 Stream。暫時不需要害怕。Metastream 就是一個 Stream,其中映射的值還是另外一個 Stream。你可以把它想像為 pointers:每個映射的值都是一個指向其它 Stream 的指針。在我們的例子里,每個請求 URL 都會被映射一個指向包含響應 Promise stream 的指針。
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/85.png" alt="響應式編程" />
Response 的 Metastream 看起來會讓人困惑,并且看起來也沒有幫到我們什么。我們只想要一個簡單的響應 stream,其中每個映射的值應該是 JSON 對象,而不是一個 JSON 對象的'Promise'。是時候介紹 (Mr. Flatmap)(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypeflatmapselector-resultselector) 了:它是 map()
的一個版本,通過把應用到"trunk" Stream 上的所有操作都應用到"branch" Stream 上,可以"flatten" Metastream。Flatmap 并不是用來"修復" Metastream 的,因為 Metastream 也不是一個漏洞,這只是一些用來處理 Rx 中的異步響應的工具。
var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/86.png" alt="響應式編程" />
很好。因為響應stream是根據(jù)請求 stream定義的,所以 如果 我們后面在請求 stream上發(fā)起更多的請求的話,在響應 stream上我們將會得到相應的響應事件,就像預期的那樣:
requestStream: --a-----b--c------------|->
responseStream: -----A--------B-----C---|->
(lowercase is a request, uppercase is its response)
現(xiàn)在,我們終于有了一個響應 stream,所以可以把收到的數(shù)據(jù)渲染出來了:
responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});
把目前為止所有的代碼放到一起就是這樣:
var requestStream = Rx.Observable.just('https://api.github.com/users');
var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});
我之前并沒有提到返回的 JSON 是一個有著 100 個用戶數(shù)據(jù)的列表。因為這個 API 只允許我們設置偏移量,而無法設置返回的用戶數(shù),所以我們現(xiàn)在是只用了 3 個用戶的數(shù)據(jù)而浪費了另外 97 個的數(shù)據(jù)。這個問題暫時可以忽略,稍后我們會學習怎么緩存這些數(shù)據(jù)。
每點擊一次刷新按鈕,請求 stream 就會映射一個新的 URL,同時我們也能得到一個新的響應。我們需要兩樣東西:一個是刷新按鈕上 Click events 組成的 Stream(咒語:一切都能是 Stream),同時我們需要根據(jù)刷新 click stream 而改變請求 stream。幸運的是,RxJS 提供了從 Event listener 生成 Observable 的函數(shù)。
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
既然刷新 click event 本身并沒有提供任何要請求的 API URL,我們需要把每一次的 Click 都映射為一個實際的 URL?,F(xiàn)在,我們把刷新 click stream 改為新的請求 stream,其中每一個 Click 都分別映射為帶有隨機偏移量的 API 端點。
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
因為我比較笨并且也沒有使用自動化測試,所以我剛把之前做好的一個特性毀掉了?,F(xiàn)在在啟動時不會再發(fā)出任何的請求,而只有在點擊刷新按鈕時才會。額...這兩個行為我都需要:無論是點擊刷新按鈕時還是剛打開頁面時都該發(fā)出一個請求。
我們知道怎么分別為這兩種情況生成 Stream:
var requestOnRefreshStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
但我們怎樣才能把這兩個"融合"為一個呢?好吧,有 merge()
函數(shù)。這就是它做的事的圖解:
stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
vvvvvvvvv merge vvvvvvvvv
---a-B---C--e--D--o----->
這樣就簡單了:
var requestOnRefreshStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
var requestStream = Rx.Observable.merge(
requestOnRefreshStream, startupRequestStream
);
還有一個更加簡潔的可選方案,不需要使用中間變量。
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.merge(Rx.Observable.just('https://api.github.com/users'));
甚至可以更簡短,更具有可讀性:
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.startWith('https://api.github.com/users');
startWith()
函數(shù)做的事和你預期的完全一樣。無論你輸入的 Stream 是怎樣,startWith(x)
輸出的 Stream 一開始都是 x 。但是還不夠 DRY,我重復了 API 終端 string。一種修復的方法是去掉 refreshClickStream
最后的 startWith()
,并在一開始的時候"模擬"一次刷新 Click。
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
很好。如果你把之前我"毀掉了的版本"的代碼和現(xiàn)在的相比,就會發(fā)現(xiàn)唯一的不同是加了 startWith()
函數(shù)。
到現(xiàn)在為止,我們只是談及了這個推薦 UI 元素在 responeStream 的 subscribe()
內(nèi)執(zhí)行的渲染步驟。對于刷新按鈕,我們還有一個問題:當你點擊‘刷新’ 時,當前存在的三個推薦并不會被清除。新的推薦會在響應到達后出現(xiàn),為了讓 UI 看起來舒服一些,當點擊刷新時,我們需要清理掉當前的推薦。
refreshClickStream.subscribe(function() {
// clear the 3 suggestion DOM elements
});
不,別那么快,朋友。這樣不好,我們現(xiàn)在有兩個訂閱者會影響到推薦的 DOM 元素(另外一個是 responseStream.subscribe()
),而且這樣完全不符合 Separation of concerns。還記得響應式編程的咒語么?
http://wiki.jikexueyuan.com/project/android-weekly/images/issue-145/87.jpg" alt="響應式編程" />
所以讓我們把顯示的推薦設計成一個 stream,其中每一個映射的值都是包含了推薦內(nèi)容的 JSON 對象。我們以此把三個推薦內(nèi)容分開來?,F(xiàn)在第一個推薦看起來是這樣子的:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
});
其他的, suggestion2Stream
和 suggestion3Stream
可以簡單的拷貝 suggestion1Stream
的代碼來使用。這不是 DRY,它會讓我們的例子變得更加簡單一些,加之我覺得這是一個可以幫助考慮如何減少重復的良好實踐。
我們不在 responseStream 的 subscribe() 中處理渲染了,我們這么處理:
suggestion1Stream.subscribe(function(suggestion) {
// render the 1st suggestion to the DOM
});
回到"當刷新時,清理掉當前的推薦",我們可以很簡單的把刷新點擊映射為 null
,并且在 suggestion1Stream
中包含進來,如下:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; })
);
當渲染時,null
解釋為"沒有數(shù)據(jù)",所以把 UI 元素隱藏起來。
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
現(xiàn)在的示意圖:
refreshClickStream: ----------o--------o---->
requestStream: -r--------r--------r---->
responseStream: ----R---------R------R-->
suggestion1Stream: ----s-----N---s----N-s-->
suggestion2Stream: ----q-----N---q----N-q-->
suggestion3Stream: ----t-----N---t----N-t-->
其中,N
代表了 null
作為一種補充,我們也可以在一開始的時候就渲染“空的”推薦內(nèi)容。這通過把 startWith(null)
添加到 Suggestion stream 就完成了:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
現(xiàn)在結(jié)果是:
refreshClickStream: ----------o---------o---->
requestStream: -r--------r---------r---->
responseStream: ----R----------R------R-->
suggestion1Stream: -N--s-----N----s----N-s-->
suggestion2Stream: -N--q-----N----q----N-q-->
suggestion3Stream: -N--t-----N----t----N-t-->
還有一個功能需要實現(xiàn)。每一個推薦,都該有自己的"X"按鈕以關(guān)閉它,然后在該位置加載另一個推薦。最初的想法,點擊任何關(guān)閉按鈕時都需要發(fā)起一個新的請求:
var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// and the same for close2Button and close3Button
var requestStream = refreshClickStream.startWith('startup click')
.merge(close1ClickStream) // we added this
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
這個沒有效果。這將會關(guān)閉并且重新加載 所有 的推薦,而不是僅僅處理我們點擊的那一個。有一些不一樣的方法可以解決,并且讓它變得更加有趣,我們可以通過復用之前的請求來解決它。API 的響應頁面有 100 個用戶,而我們僅僅使用其中的三個,所以還有很多的新數(shù)據(jù)可以使用,無須重新發(fā)起請求。
同樣的,我們用Stream的方式來思考。當點擊'close1'時,我們想要用 responseStream
最近的映射從響應列表中獲取一個隨機的用戶,如:
requestStream: --r--------------->
responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->
在 Rx* 中, 叫做連接符函數(shù)的 combineLatest
似乎實現(xiàn)了我們想要的功能。它接受兩個 Stream,A 和 B 作為輸入,當其中一個 Stream 發(fā)射一個值時, combineLatest
把最近兩個發(fā)射的值 a 和 b 從各自的 Stream 中取出并且返回一個 c = f(x,y)
,其中 f
為你定義的函數(shù)。用圖來表示更好:
stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
vvvvvvvv combineLatest(f) vvvvvvv
----AB---AC--EC---ED--ID--IQ---->
where f is the uppercase function
我們可以在 close1ClickStream
和 responseStream
上使用 combineLatest(),所以無論什么時候當一個按鈕被點擊時,我們可以獲得最新的響應發(fā)射值,并且在 suggestion1Stream
上產(chǎn)生一個新的值。另一方面,combineLatest() 是對稱的,當一個新的響應在 responseStream
發(fā)射時,它將會把最后的'關(guān)閉 1'的點擊事件一起合并來產(chǎn)生一個新的推薦。這是有趣的,因為它允許我們把之前的 suggestion1Stream
代碼簡化成下邊這個樣子:
var suggestion1Stream = close1ClickStream
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
還有一個問題需要解決。combineLatest() 使用最近的兩個數(shù)據(jù)源,但是當其中一個來源沒發(fā)起任何事件時,combineLatest() 無法在 Output stream 中產(chǎn)生一個 Data event。從上邊的 ASCII 圖中,你可以看到,當?shù)谝粋€ Stream 發(fā)射值 a
時,這個值時并沒有任何輸出產(chǎn)生,只有當?shù)诙€ Stream 發(fā)射值 b
時才有值輸出。
有多種方法可以解決這個問題,我們選擇最簡單的一種,一開始在'close 1'按鈕上模擬一個點擊事件:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
.combineLatest(responseStream,
function(click, listUsers) {l
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
終于完成了,所有的代碼合在一起是這樣子:
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var responseStream = requestStream
.flatMap(function (requestUrl) {
return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
});
var suggestion1Stream = close1ClickStream.startWith('startup click')
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
你可以查看這個最終效果 http://jsfiddle.net/staltz/8jFJH/48/
這段代碼雖然短小,但實現(xiàn)了不少功能:它適當?shù)氖褂?Separation of concerns 實現(xiàn)了對 Multiple events 的管理,甚至緩存了響應。函數(shù)式的風格讓代碼看起來更加 Declarative 而非 Imperative:我們并非給出一組指令去執(zhí)行,而是通過定義 Stream 之間的關(guān)系 定義這是什么 。舉個例子,我們使用 Rx 告訴計算機 *suggestion1Stream*
是 由 'close 1' Stream 與最新響應中的一個用戶合并而來,在程序剛運行或者刷新時則是 null
。
留意一下代碼中并沒有出現(xiàn)如 if
、 for
、 while
這樣的控制語句,或者一般 JavaScript 應用中典型的基于回調(diào)的控制流。如果你想使用 filter()
,上面的 subscribe()
中甚至可以不用 if
、 else
(實現(xiàn)細節(jié)留給讀者作為練習)。在 Rx 中,我們有著像 map
、 filter
、 scan
、 merge
、 combineLatest
、 startWith
這樣的 Stream 函數(shù),甚至更多類似的函數(shù)去控制一個事件驅(qū)動(Event-driven)的程序。這個工具集讓你可以用更少的代碼實現(xiàn)更多的功能。
如果你覺得 Rx* 會成為你首選的響應式編程庫,花點時間去熟悉這個big list of functions,它包括了如何轉(zhuǎn)換、合并、以及創(chuàng)建 Observable。如果你想通過圖表去理解這些函數(shù),請看 。無論什么時候你遇到問題,畫一下這些圖,思考一下,看一下這一大串函數(shù),然后繼續(xù)思考。以我個人經(jīng)驗,這樣效果很明顯。
一旦你開始使用 Rx 去編程,很有必要去理解 Cold vs Hot Observables 中的概念。如果忽略了這些,你一不小心就會被它坑了。我提醒過你了。通過學習真正的函數(shù)式編程去提升自己的技能,并熟悉那些會影響到 Rx 的問題,比如副作用。
但是響應式編程不僅僅是 Rx。還有相對容易理解的 Bacon.js,它沒有 Rx 那些怪癖。Elm Language 則以它自己的方式支持 RP:它是一門會編譯成 Javascript + HTML + CSS 的響應式編程語言 ,并有一個 time travelling debugger。非常厲害。
Rx 在需要處理大量事件的 Frontend 和 Apps 中非常有用。但它不僅僅能用在客戶端,在后端或者與數(shù)據(jù)庫交互時也非常有用。事實上,RxJava 是實現(xiàn)Netflix's API服務器端并發(fā)的一個重要組件 。Rx 并不是一個只能在某種應用或者語言中使用的 Framework。它本質(zhì)上是一個在開發(fā)任何 Event-driven 軟件中都能使用的編程范式。
如果教程幫到你了,請支持。