鍍金池/ 教程/ Java/ 分區(qū)
異步 TCP、UDP 及 HTTP
微批處理
編解碼器與緩沖區(qū)
組合多個服務(wù)調(diào)用
環(huán)境與調(diào)度者
環(huán)形緩沖區(qū)處理者
Stream 數(shù)據(jù)持久化
函數(shù)式功能
reactor-stream
概述
背壓和溢出
核心處理者
數(shù)據(jù)路由
響應(yīng)式數(shù)據(jù)流
什么是 Reactor?
支持響應(yīng)式背壓
微服務(wù)
組合操作
使用 Stream 和 Promise(約定) 協(xié)調(diào)任務(wù)
Reactor 介紹
分區(qū)
發(fā)布訂閱模型
錯誤處理
分析
核心概述
創(chuàng)建非阻塞服務(wù)
使用緩沖區(qū)
請求應(yīng)答模式
關(guān)于該項目
理解線程模型
reactor-核心
響應(yīng)式擴展
Streams 的基礎(chǔ)知識
構(gòu)架總覽
Rx 之外的其它 API
注冊表
使用窗口
使用前提

分區(qū)

分區(qū)(Partition)是一種針對并行、并發(fā)作業(yè)的 Stream。

以響應(yīng)式編程的方式編寫的功能組件,有一個重要的方面就是它的工作可以審慎的切塊,交由任意調(diào)度器完成。這意味著你可以很輕松的將輸入值組合沖一個工作流——在另一個線程執(zhí)行操作,然后當結(jié)果可用時將其交給子序列,完成轉(zhuǎn)化。這是 Reactor 很常見的使用模式。

DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "groupByPool");
DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "partitionPool");

Streams
    .range(1, 10)
    .groupBy(n -> n % 2 == 0) //1
    .flatMap(stream -> stream
            .dispatchOn(supplier1.get()) //2
            .log("groupBy")
    )
    .partition(5) //3
    .flatMap(stream -> stream
            .dispatchOn(supplier2.get()) //4
            .log("partition")
    )
    .dispatchOn(Environment.sharedDispatcher()) //5
    .log("join")
    .consume();
  1. 創(chuàng)建至多兩個(奇/偶)數(shù)據(jù)流,以鍵值 0 或 1 標記,并將 onNext(T) 信號分發(fā)給匹配的數(shù)據(jù)流。
  2. 使用前面的 GroupByAction,為兩個正在發(fā)送的 Stream 添加一個已經(jīng)生成好的調(diào)度器。通過像這樣使用分配于各自調(diào)度器的兩個分區(qū),數(shù)據(jù)流得到了有效的擴充。FlatMap 將合并兩個分區(qū)的返回值,這個過程運行在兩個線程之一,但絕不會并行處理。
  3. 創(chuàng)建 5 個分區(qū),并將 onNext(T) 信號以循環(huán)的方式分發(fā)給它們。
  4. 使用第二個調(diào)度器分配新生成的數(shù)據(jù)流。返回的序列將被合并。
  5. 使用 Environment.sharedDispatcher() 而不是前兩個線程池分派數(shù)據(jù)。 五個線程將在 Dispatcher 線程合并。

提取輸出

03:53:42.060 [groupByPool-3] INFO  groupBy - onNext: 4
03:53:42.060 [partitionPool-8] INFO  partition - onNext: 9
03:53:42.061 [groupByPool-3] INFO  groupBy - onNext: 6
03:53:42.061 [partitionPool-8] INFO  partition - onNext: 4
03:53:42.061 [shared-1] INFO  join - onNext: 9
03:53:42.061 [groupByPool-3] INFO  groupBy - onNext: 8
03:53:42.061 [partitionPool-4] INFO  partition - onNext: 6
03:53:42.061 [shared-1] INFO  join - onNext: 4
03:53:42.061 [groupByPool-3] INFO  groupBy - onNext: 10
03:53:42.061 [shared-1] INFO  join - onNext: 6
03:53:42.061 [groupByPool-3] INFO  groupBy - complete: DispatcherAction

http://wiki.jikexueyuan.com/project/reactor-2.0/images/41.png" alt="" />