多數(shù)情況下,依照 Reactor Stream 的協(xié)定,背壓可以被自動處理。如果訂閱者(Subscriber)
請求的數(shù)據(jù)并沒有超過其處理能力(例如類似 Long.MAX_VALUE
的東西),數(shù)據(jù)源上游可以避免發(fā)送過多數(shù)據(jù)。如果你想在使用一個 “冷”的發(fā)布者(Publisher)
時享受這種便利,你必須可以在任何時候關(guān)閉數(shù)據(jù)源的讀取操作:從 socket 中讀取多少、SQL 查詢指針中有多少行、文件中有多少行、迭代構(gòu)造體中有多少元素……
如果是 熱 數(shù)據(jù)源,例如定時器或 UI 事件,或是一個可能從大型數(shù)據(jù)集上請求 Long.Max_VALUE 大小數(shù)據(jù)的訂閱者(Subscriber),開發(fā)者必須針對背壓**制定明確的策略。
Reactor 提供了一系列處理冷熱序列的 API
- 非控(熱)序列應(yīng)當(dāng)主動管理。
- 減少 序列的信號量,例如“取樣”。
- 當(dāng)需求超過容量時,忽略 數(shù)據(jù)。
- 當(dāng)需求超過容量時,緩沖 數(shù)據(jù)。
- 受控(冷)序列應(yīng)當(dāng)被動管理。
- 通過降低來自訂閱者(Subscriber)或 Stream 上任意點的需求。
- 通過延遲請求斷歇需求。
Reactor 擴(kuò)展文檔中應(yīng)用最廣泛的示例就是 Marble Diagram,雙時間線幫助我們更直觀的了解發(fā)布者(Publisher)、Stream以及訂閱者(Subscriber) (如Action)在何時被觀察,觀察的內(nèi)容又是什么。我們將使用這些圖表來強調(diào)需求流,表明例如 Map 和 filter 這樣的變換的本質(zhì)。
http://wiki.jikexueyuan.com/project/reactor-2.0/images/30.png" alt="" />
當(dāng)兩個 Action 的調(diào)度器或容量不同時,Reactor 將自動提供一個內(nèi)存溢出緩沖區(qū)。這不適用于核心處理器,它有自己的溢出處理機(jī)制。調(diào)度器可以重復(fù)使用,且 Reactor 必須限制調(diào)度器的數(shù)量,因此 Action 的調(diào)度器不同時,將添加內(nèi)存緩沖區(qū)。
Streams.just(1,2,3,4,5)
.buffer(3) //1
//onOverflowBuffer()
.capacity(2) //2
.consume()
Streams.just(1,2,3,4,5)
.dispatchOn(dispatcher1) //3
//onOverflowBuffer()
.dispatchOn(dispatcher2) //4
.consume()
最終 Subscriber
可以逐一的請求數(shù)據(jù),限制管道中傳輸?shù)臄?shù)據(jù)為一個元素,并在每次成功調(diào)用 onNext(T)
后請求下一個元素。這種行為也可以通過 capacity(1).consume(...)
獲得。
Streams.range(1,1000000)
.subscribe(new DefaultSubscriber<Long>(){
Subscription sub;
@Override
void onSubscribe(Subscription sub){
this.sub = sub;
sub.request(1);
}
@Override
void onNext(Long n){
httpClient.get("localhost/"+n).onSuccess(rep -> sub.request(1));
}
);
DefaultSubscriber
以避免逐個實現(xiàn)訂閱者的所有方法。RangeStreamPublisher
。你可以想到,通過計算兩次請求的時間間隔,我們將能夠深入的了解執(zhí)行過程及 IO 操作所產(chǎn)生的延遲。表 12,控制傳遞數(shù)據(jù)的信號量
http://wiki.jikexueyuan.com/project/reactor-2.0/images/31.png" alt="" />
http://wiki.jikexueyuan.com/project/reactor-2.0/images/32.png" alt="" />