鍍金池/ 教程/ Java/ 背壓和溢出
異步 TCP、UDP 及 HTTP
微批處理
編解碼器與緩沖區(qū)
組合多個服務(wù)調(diào)用
環(huán)境與調(diào)度者
環(huán)形緩沖區(qū)處理者
Stream 數(shù)據(jù)持久化
函數(shù)式功能
reactor-stream
概述
背壓和溢出
核心處理者
數(shù)據(jù)路由
響應(yīng)式數(shù)據(jù)流
什么是 Reactor?
支持響應(yīng)式背壓
微服務(wù)
組合操作
使用 Stream 和 Promise(約定) 協(xié)調(diào)任務(wù)
Reactor 介紹
分區(qū)
發(fā)布訂閱模型
錯誤處理
分析
核心概述
創(chuàng)建非阻塞服務(wù)
使用緩沖區(qū)
請求應(yīng)答模式
關(guān)于該項目
理解線程模型
reactor-核心
響應(yīng)式擴(kuò)展
Streams 的基礎(chǔ)知識
構(gòu)架總覽
Rx 之外的其它 API
注冊表
使用窗口
使用前提

背壓和溢出

多數(shù)情況下,依照 Reactor Stream 的協(xié)定,背壓可以被自動處理。如果訂閱者(Subscriber)請求的數(shù)據(jù)并沒有超過其處理能力(例如類似 Long.MAX_VALUE 的東西),數(shù)據(jù)源上游可以避免發(fā)送過多數(shù)據(jù)。如果你想在使用一個 “冷”的發(fā)布者(Publisher) 時享受這種便利,你必須可以在任何時候關(guān)閉數(shù)據(jù)源的讀取操作:從 socket 中讀取多少、SQL 查詢指針中有多少行、文件中有多少行、迭代構(gòu)造體中有多少元素…… 如果是數(shù)據(jù)源例如定時器或 UI 事件,或是一個可能從大型數(shù)據(jù)集上請求 Long.Max_VALUE 大小數(shù)據(jù)的訂閱者(Subscriber),開發(fā)者必須針對背壓**制定明確的策略。

Reactor 提供了一系列處理冷熱序列的 API

  • 非控(熱)序列應(yīng)當(dāng)主動管理。
  • 減少 序列的信號量,例如“取樣”。
  • 當(dāng)需求超過容量時,忽略 數(shù)據(jù)。
  • 當(dāng)需求超過容量時,緩沖 數(shù)據(jù)。
  • 受控(冷)序列應(yīng)當(dāng)被動管理。
  • 通過降低來自訂閱者(Subscriber)或 Stream 上任意點的需求。
  • 通過延遲請求斷歇需求。

Reactor 擴(kuò)展文檔中應(yīng)用最廣泛的示例就是 Marble Diagram,雙時間線幫助我們更直觀的了解發(fā)布者(Publisher)、Stream以及訂閱者(Subscriber) (如Action)在何時被觀察,觀察的內(nèi)容又是什么。我們將使用這些圖表來強調(diào)需求流,表明例如 Map 和 filter 這樣的變換的本質(zhì)。

http://wiki.jikexueyuan.com/project/reactor-2.0/images/30.png" alt="" />

當(dāng)兩個 Action 的調(diào)度器或容量不同時,Reactor 將自動提供一個內(nèi)存溢出緩沖區(qū)。這不適用于核心處理器,它有自己的溢出處理機(jī)制。調(diào)度器可以重復(fù)使用,且 Reactor 必須限制調(diào)度器的數(shù)量,因此 Action 的調(diào)度器不同時,將添加內(nèi)存緩沖區(qū)。

Streams.just(1,2,3,4,5)
  .buffer(3) //1
  //onOverflowBuffer()
  .capacity(2) //2
  .consume()

Streams.just(1,2,3,4,5)
  .dispatchOn(dispatcher1) //3
  //onOverflowBuffer()
  .dispatchOn(dispatcher2) //4
  .consume()
  1. buffer 操作設(shè)定容量為 3。
  2. consume() 或任何下游動作都被設(shè)定為 capacity(2),隱式的添加了一個 onOverflowBuffer()。
  3. 在調(diào)度器 1 上執(zhí)行第一個動作。
  4. 在調(diào)度器 2 上執(zhí)行第二個動作,隱式的添加了一個 onOverflowBuffer()。

最終 Subscriber 可以逐一的請求數(shù)據(jù),限制管道中傳輸?shù)臄?shù)據(jù)為一個元素,并在每次成功調(diào)用 onNext(T) 后請求下一個元素。這種行為也可以通過 capacity(1).consume(...) 獲得。

Streams.range(1,1000000)
  .subscribe(new DefaultSubscriber<Long>(){ 
    Subscription sub;

    @Override
    void onSubscribe(Subscription sub){
      this.sub = sub;
      sub.request(1); 
    }

    @Override
    void onNext(Long n){
      httpClient.get("localhost/"+n).onSuccess(rep -> sub.request(1)); 
    }
  );
  1. 使用 DefaultSubscriber 以避免逐個實現(xiàn)訂閱者的所有方法。
  2. 持有訂閱的指針后安排第一次需求請求。
  3. 在成功的 GET 請求后,使用 異步 HTTP API 再次請求。延遲信息自然將被傳遞給 RangeStreamPublisher。你可以想到,通過計算兩次請求的時間間隔,我們將能夠深入的了解執(zhí)行過程及 IO 操作所產(chǎn)生的延遲。

表 12,控制傳遞數(shù)據(jù)的信號量

http://wiki.jikexueyuan.com/project/reactor-2.0/images/31.png" alt="" />

http://wiki.jikexueyuan.com/project/reactor-2.0/images/32.png" alt="" />

上一篇:關(guān)于該項目下一篇:使用前提