subject
是一個(gè)神奇的對(duì)象,它可以是一個(gè)Observable同時(shí)也可以是一個(gè)Observer:它作為連接這兩個(gè)世界的一座橋梁。一個(gè)Subject可以訂閱一個(gè)Observable,就像一個(gè)觀察者,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù),就像一個(gè)Observable。很明顯,作為一個(gè)Observable,觀察者們或者其它Subject都可以訂閱它。
一旦Subject訂閱了Observable,它將會(huì)觸發(fā)Observable開始發(fā)射。如果原始的Observable是“冷”的,這將會(huì)對(duì)訂閱一個(gè)“熱”的Observable變量產(chǎn)生影響。
RxJava提供四種不同的Subject:
Publish是Subject的一個(gè)基礎(chǔ)子類。讓我們看看用PublishSubject實(shí)現(xiàn)傳統(tǒng)的Observable Hello World
:
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
在剛才的例子中,我們創(chuàng)建了一個(gè)PublishSubject
,用create()
方法發(fā)射一個(gè)String
值,然后我們訂閱了PublishSubject。此時(shí),沒有數(shù)據(jù)要發(fā)送,因此我們的觀察者只能等待,沒有阻塞線程,也沒有消耗資源。就在這隨時(shí)準(zhǔn)備從subject接收值,如果subject沒有發(fā)射值那么我們的觀察者就會(huì)一直在等待。再次聲明的是,無(wú)需擔(dān)心:觀察者知道在每個(gè)場(chǎng)景中該做什么,我們不用擔(dān)心什么時(shí)候是因?yàn)樗琼憫?yīng)式的:系統(tǒng)會(huì)響應(yīng)。我們并不關(guān)心它什么時(shí)候響應(yīng)。我們只關(guān)心它響應(yīng)時(shí)該做什么。
最后一行代碼展示了手動(dòng)發(fā)射字符串“Hello World”,它觸發(fā)了觀察者的onNext()
方法,讓我們?cè)诳刂婆_(tái)打印出“Hello World”信息。
讓我們看一個(gè)更復(fù)雜的例子。話說(shuō)我們有一個(gè)private
聲明的Observable,外部不能訪問。Observable在它生命周期內(nèi)發(fā)射值,我們不用關(guān)心這些值,我們只關(guān)心他們的結(jié)束。
首先,我們創(chuàng)建一個(gè)新的PublishSubject來(lái)響應(yīng)它的onNext()
方法,并且外部也可以訪問它。
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable Completed");
}
});
然后,我們創(chuàng)建“私有”的Observable,只有subject才可以訪問的到。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
subject.onNext(true);
}
}).subscribe();
Observable.create()
方法包含了我們熟悉的for循環(huán),發(fā)射數(shù)字。doOnCompleted()
方法指定當(dāng)Observable結(jié)束時(shí)要做什么事情:在subject上發(fā)射true。最后,我們訂閱了Observable。很明顯,空的subscribe()
調(diào)用僅僅是為了開啟Observable,而不用管已發(fā)出的任何值,也不用管完成事件或者錯(cuò)誤事件。為了這個(gè)例子我們需要它像這樣。
在這個(gè)例子中,我們創(chuàng)建了一個(gè)可以連接Observables并且同時(shí)可被觀測(cè)的實(shí)體。當(dāng)我們想為公共資源創(chuàng)建獨(dú)立、抽象或更易觀測(cè)的點(diǎn)時(shí),這是極其有用的。
簡(jiǎn)單的說(shuō),BehaviorSubject會(huì)首先向他的訂閱者發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
在這個(gè)短例子中,我們創(chuàng)建了一個(gè)能發(fā)射整形(Integer)的BehaviorSubject。由于每當(dāng)Observes訂閱它時(shí)就會(huì)發(fā)射最新的數(shù)據(jù),所以它需要一個(gè)初始值。
ReplaySubject會(huì)緩存它所訂閱的所有數(shù)據(jù),向任意一個(gè)訂閱它的觀察者重發(fā):
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
當(dāng)Observable完成時(shí)AsyncSubject只會(huì)發(fā)布最后一個(gè)數(shù)據(jù)給已經(jīng)訂閱的每一個(gè)觀察者。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();