鍍金池/ 教程/ Android/ Subject = Observable + Observer
combineLatest
從列表創(chuàng)建一個(gè)Observable
RxJava的與眾不同之處
Schedulers
RxJava觀察者模式工具包
總結(jié)
工具
你什么時(shí)候使用觀察者模式?
GroupBy
App架構(gòu)
組合Observables
創(chuàng)建Activity類
StartWith
RX - 從.NET到RxJava
處理耗時(shí)的任務(wù)
過(guò)濾Observables
向響應(yīng)式世界問好
避免阻塞I/O的操作
Join
有且僅有一次
Schedulers-解決Android主線程問題
轉(zhuǎn)換Observables
啟動(dòng)引擎
我們的第一個(gè)Observable
總結(jié)
StrictMode
Debounce
*map家族
創(chuàng)建RecyclerView Adapter
為什么是Observables?
Merge
再多幾個(gè)例子
總結(jié)
Buffer
Window
總結(jié)
Timeout
執(zhí)行網(wǎng)絡(luò)任務(wù)
項(xiàng)目目標(biāo)
來(lái)到Java世界 - Netflix RxJava
獲取我們需要的數(shù)據(jù)
Observable
過(guò)濾序列
非阻塞I/O操作
ZIP
總結(jié)
And,Then和When
觀察者模式
Retrofit
Cast
Skip and SkipLast
微軟響應(yīng)式擴(kuò)展
與REST無(wú)縫結(jié)合-RxJava和Retrofit
First and last
RxJava Essentials 中文翻譯版
Switch
ElementAt
總結(jié)
總結(jié)
總結(jié)
Sampling
SubscribeOn and ObserveOn
Subject = Observable + Observer

Subject = Observable + Observer

subject是一個(gè)神奇的對(duì)象,它可以是一個(gè)Observable同時(shí)也可以是一個(gè)Observer:它作為連接這兩個(gè)世界的一座橋梁。一個(gè)Subject可以訂閱一個(gè)Observable,就像一個(gè)觀察者,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù),就像一個(gè)Observable。很明顯,作為一個(gè)Observable,觀察者們或者其它Subject都可以訂閱它。

一旦Subject訂閱了Observable,它將會(huì)觸發(fā)Observable開始發(fā)射。如果原始的Observable是“冷”的,這將會(huì)對(duì)訂閱一個(gè)“熱”的Observable變量產(chǎn)生影響。

RxJava提供四種不同的Subject:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject.
  • AsyncSubject

PublishSubject

Publish是Subject的一個(gè)基礎(chǔ)子類。讓我們看看用PublishSubject實(shí)現(xiàn)傳統(tǒng)的Observable Hello World:

PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no!Something wrong happened!");                
    }

    @Override
    public void onNext(String message) {
        System.out.println(message);
    }
});
stringPublishSubject.onNext("Hello World");

在剛才的例子中,我們創(chuàng)建了一個(gè)PublishSubject,用create()方法發(fā)射一個(gè)String值,然后我們訂閱了PublishSubject。此時(shí),沒有數(shù)據(jù)要發(fā)送,因此我們的觀察者只能等待,沒有阻塞線程,也沒有消耗資源。就在這隨時(shí)準(zhǔn)備從subject接收值,如果subject沒有發(fā)射值那么我們的觀察者就會(huì)一直在等待。再次聲明的是,無(wú)需擔(dān)心:觀察者知道在每個(gè)場(chǎng)景中該做什么,我們不用擔(dān)心什么時(shí)候是因?yàn)樗琼憫?yīng)式的:系統(tǒng)會(huì)響應(yīng)。我們并不關(guān)心它什么時(shí)候響應(yīng)。我們只關(guān)心它響應(yīng)時(shí)該做什么。

最后一行代碼展示了手動(dòng)發(fā)射字符串“Hello World”,它觸發(fā)了觀察者的onNext()方法,讓我們?cè)诳刂婆_(tái)打印出“Hello World”信息。

讓我們看一個(gè)更復(fù)雜的例子。話說(shuō)我們有一個(gè)private聲明的Observable,外部不能訪問。Observable在它生命周期內(nèi)發(fā)射值,我們不用關(guān)心這些值,我們只關(guān)心他們的結(jié)束。

首先,我們創(chuàng)建一個(gè)新的PublishSubject來(lái)響應(yīng)它的onNext()方法,并且外部也可以訪問它。

final PublishSubject<Boolean> subject = PublishSubject.create();

subject.subscribe(new Observer<Boolean>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Boolean aBoolean) {
        System.out.println("Observable Completed");
    }
});

然后,我們創(chuàng)建“私有”的Observable,只有subject才可以訪問的到。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).doOnCompleted(new Action0() {
    @Override
    public void call() {
        subject.onNext(true);
    }
}).subscribe();

Observable.create()方法包含了我們熟悉的for循環(huán),發(fā)射數(shù)字。doOnCompleted()方法指定當(dāng)Observable結(jié)束時(shí)要做什么事情:在subject上發(fā)射true。最后,我們訂閱了Observable。很明顯,空的subscribe()調(diào)用僅僅是為了開啟Observable,而不用管已發(fā)出的任何值,也不用管完成事件或者錯(cuò)誤事件。為了這個(gè)例子我們需要它像這樣。

在這個(gè)例子中,我們創(chuàng)建了一個(gè)可以連接Observables并且同時(shí)可被觀測(cè)的實(shí)體。當(dāng)我們想為公共資源創(chuàng)建獨(dú)立、抽象或更易觀測(cè)的點(diǎn)時(shí),這是極其有用的。

BehaviorSubject

簡(jiǎn)單的說(shuō),BehaviorSubject會(huì)首先向他的訂閱者發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);

在這個(gè)短例子中,我們創(chuàng)建了一個(gè)能發(fā)射整形(Integer)的BehaviorSubject。由于每當(dāng)Observes訂閱它時(shí)就會(huì)發(fā)射最新的數(shù)據(jù),所以它需要一個(gè)初始值。

ReplaySubject

ReplaySubject會(huì)緩存它所訂閱的所有數(shù)據(jù),向任意一個(gè)訂閱它的觀察者重發(fā):

ReplaySubject<Integer> replaySubject = ReplaySubject.create();

AsyncSubject

當(dāng)Observable完成時(shí)AsyncSubject只會(huì)發(fā)布最后一個(gè)數(shù)據(jù)給已經(jīng)訂閱的每一個(gè)觀察者。

AsyncSubject<Integer> asyncSubject = AsyncSubject.create();