鍍金池/ 教程/ Android/ Observable
調(diào)度器 Scheduler
Empty/Never/Throw
Replay
這個頁面展示了創(chuàng)建Observable的各種方法。
ObserveOn
ReactiveX
TimeInterval
Window
本頁展示的操作符用于對整個序列執(zhí)行算法操作或其它操作,由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù)),它們對于非常長
IgnoreElements
Distinct
Last
Start
And/Then/When
Switch
創(chuàng)建操作
Materialize/Dematerialize
CombineLatest
Catch
實現(xiàn)自己的操作符
StringObservable
Map
ConnectableObservable
Using
Take
BlockingObservable
TakeLast
Defer
RxJavaSchedulersHook
First
FlatMap
這個頁面的操作符可用于根據(jù)條件發(fā)射或變換Observables,或者對它們做布爾運算:
Do
Repeat
Serialize
這個頁面展示的操作符可用于過濾和選擇Observable發(fā)射的數(shù)據(jù)序列。
這個頁面列出了很多用于Observable的輔助操作符
Single
Retry
從錯誤中恢復(fù)的技術(shù)
Sample
Merge
算術(shù)和聚合操作
Range
Timestamp
RxJava Issues
From
Subscribe
Subject
Delay
Skip
SubscribeOn
Filter
按字母順序排列的全部操作符列表
Timeout
Scan
onError
Zip
RxJava文檔和教程
Publish
ElementAt
第一個例子
SkipLast
Just
Timer
Debounce
GroupBy
條件和布爾操作
這個頁面展示了可用于對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作的各種操作符。
Introduction
rxjava-async
介紹響應(yīng)式編程
這個頁面展示的操作符可用于組合多個Observables。
ReactiveX
Connect
操作符分類
StartWith
Interval
Join
To
Buffer
RefCount
介紹
Observable

Observable

概述

在ReactiveX中,一個觀察者(Observer)訂閱一個可觀察對象(Observable)。觀察者對Observable發(fā)射的數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。這種模式可以極大地簡化并發(fā)操作,因為它創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,在未來某個時刻響應(yīng)Observable的通知,不需要阻塞等待Observable發(fā)射數(shù)據(jù)。

這篇文章會解釋什么是響應(yīng)式編程模式(reactive pattern),以及什么是可觀察對象(Observables)和觀察者(observers),其它幾篇文章會展示如何用操作符組合和改變Observable的行為。

http://wiki.jikexueyuan.com/project/rx-docs/images/legend.png" alt="Observable" />

相關(guān)參考:

  • Single - 一個特殊的Observable,只發(fā)射單個數(shù)據(jù)。

背景知識

在很多軟件編程任務(wù)中,或多或少你都會期望你寫的代碼能按照編寫的順序,一次一個的順序執(zhí)行和完成。但是在ReactiveX中,很多指令可能是并行執(zhí)行的,之后他們的執(zhí)行結(jié)果才會被觀察者捕獲,順序是不確定的。為達到這個目的,你定義一種獲取和變換數(shù)據(jù)的機制,而不是調(diào)用一個方法。在這種機制下,存在一個可觀察對象(Observable),觀察者(Observer)訂閱(Subscribe)它,當數(shù)據(jù)就緒時,之前定義的機制就會分發(fā)數(shù)據(jù)給一直處于等待狀態(tài)的觀察者哨兵。

這種方法的優(yōu)點是,如果你有大量的任務(wù)要處理,它們互相之間沒有依賴關(guān)系。你可以同時開始執(zhí)行它們,不用等待一個完成再開始下一個(用這種方式,你的整個任務(wù)隊列能耗費的最長時間,不會超過任務(wù)里最耗時的那個)。

有很多術(shù)語可用于描述這種異步編程和設(shè)計模式,在在本文里我們使用這些術(shù)語:一個觀察者訂閱一個可觀察對象 (An observer subscribes to an Observable)。通過調(diào)用觀察者的方法,Observable發(fā)射數(shù)據(jù)或通知給它的觀察者。

在其它的文檔和場景里,有時我們也將Observer叫做Subscriber、WatcherReactor。這個模型通常被稱作Reactor模式。

創(chuàng)建觀察者

本文使用類似于Groovy的偽代碼舉例,但是ReactiveX有多種語言的實現(xiàn)。

普通的方法調(diào)用(不是某種異步方法,也不是Rx中的并行調(diào)用),流程通常是這樣的:

  1. 調(diào)用某一個方法
  2. 用一個變量保存方法返回的結(jié)果
  3. 使用這個變量和它的新值做些有用的事

用代碼描述就是:

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在異步模型中流程更像這樣的:

  1. 定義一個方法,它完成某些任務(wù),然后從異步調(diào)用中返回一個值,這個方法是觀察者的一部分
  2. 將這個異步調(diào)用本身定義為一個Observable
  3. 觀察者通過訂閱(Subscribe)操作關(guān)聯(lián)到那個Observable
  4. 繼續(xù)你的業(yè)務(wù)邏輯,等方法返回時,Observable會發(fā)射結(jié)果,觀察者的方法會開始處理結(jié)果或結(jié)果集

用代碼描述就是:


// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

回調(diào)方法 (onNext, onCompleted, onError)

Subscribe方法用于將觀察者連接到Observable,你的觀察者需要實現(xiàn)以下方法的一個子集:

  • onNext(T item)

    Observable調(diào)用這個方法發(fā)射數(shù)據(jù),方法的參數(shù)就是Observable發(fā)射的數(shù)據(jù),這個方法可能會被調(diào)用多次,取決于你的實現(xiàn)。

  • onError(Exception ex)

    當Observable遇到錯誤或者無法返回期望的數(shù)據(jù)時會調(diào)用這個方法,這個調(diào)用會終止Observable,后續(xù)不會再調(diào)用onNext和onCompleted,onError方法的參數(shù)是拋出的異常。

  • onComplete

    正常終止,如果沒有遇到錯誤,Observable在最后一次調(diào)用onNext之后調(diào)用此方法。

根據(jù)Observable協(xié)議的定義,onNext可能會被調(diào)用零次或者很多次,最后會有一次onCompleted或onError調(diào)用(不會同時),傳遞數(shù)據(jù)給onNext通常被稱作發(fā)射,onCompleted和onError被稱作通知。

下面是一個更完整的例子:


def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

取消訂閱 (Unsubscribing)

在一些ReactiveX實現(xiàn)中,有一個特殊的觀察者接口Subscriber,它有一個unsubscribe方法。調(diào)用這個方法表示你不關(guān)心當前訂閱的Observable了,因此Observable可以選擇停止發(fā)射新的數(shù)據(jù)項(如果沒有其它觀察者訂閱)。

取消訂閱的結(jié)果會傳遞給這個Observable的操作符鏈,而且會導(dǎo)致這個鏈條上的每個環(huán)節(jié)都停止發(fā)射數(shù)據(jù)項。這些并不保證會立即發(fā)生,然而,對一個Observable來說,即使沒有觀察者了,它也可以在一個while循環(huán)中繼續(xù)生成并嘗試發(fā)射數(shù)據(jù)項。

關(guān)于命名約定

ReactiveX的每種特定語言的實現(xiàn)都有自己的命名偏好,雖然不同的實現(xiàn)之間有很多共同點,但并不存在一個統(tǒng)一的命名標準。

而且,在某些場景中,一些名字有不同的隱含意義,或者在某些語言看來比較怪異。

例如,有一個onEvent命名模式(onNext, onCompleted, onError),在一些場景中,這些名字可能意味著事件處理器已經(jīng)注冊。然而在ReactiveX里,他們是事件處理器的名字。

Observables的"熱"和"冷"

Observable什么時候開始發(fā)射數(shù)據(jù)序列?這取決于Observable的實現(xiàn),一個"熱"的Observable可能一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個觀察者可以確保會收到整個數(shù)據(jù)序列。

在一些ReactiveX實現(xiàn)里,還存在一種被稱作Connectable的Observable,不管有沒有觀察者訂閱它,這種Observable都不會開始發(fā)射數(shù)據(jù),除非Connect方法被調(diào)用。

用操作符組合Observable

對于ReactiveX來說,Observable和Observer僅僅是個開始,它們本身不過是標準觀察者模式的一些輕量級擴展,目的是為了更好的處理事件序列。

ReactiveX真正強大的地方在于它的操作符,操作符讓你可以變換、組合、操縱和處理Observable發(fā)射的數(shù)據(jù)。

Rx的操作符讓你可以用聲明式的風格組合異步操作序列,它擁有回調(diào)的所有效率優(yōu)勢,同時又避免了典型的異步系統(tǒng)中嵌套回調(diào)的缺點。

下面是常用的操作符列表:

  1. 創(chuàng)建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 變換操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 過濾操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 組合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 錯誤處理 Catch和Retry
  6. 輔助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 條件和布爾操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算術(shù)和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 轉(zhuǎn)換操作 To
  10. 連接操作 Connect, Publish, RefCount, Replay
  11. 反壓操作,用于增加特殊的流程控制策略的操作符

這些操作符并不全都是ReactiveX的核心組成部分,有一些是語言特定的實現(xiàn)或可選的模塊。

RxJava

在RxJava中,一個實現(xiàn)了_Observer_接口的對象可以訂閱(subscribe)一個Observable 類的實例。訂閱者(subscriber)對Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。這種模式簡化了并發(fā)操作,因為它不需要阻塞等待Observable發(fā)射數(shù)據(jù),而是創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,哨兵在未來某個時刻響應(yīng)Observable的通知。

上一篇:Timeout下一篇:Connect