鍍金池/ 教程/ Java/ 組合操作
異步 TCP、UDP 及 HTTP
微批處理
編解碼器與緩沖區(qū)
組合多個(gè)服務(wù)調(diào)用
環(huán)境與調(diào)度者
環(huán)形緩沖區(qū)處理者
Stream 數(shù)據(jù)持久化
函數(shù)式功能
reactor-stream
概述
背壓和溢出
核心處理者
數(shù)據(jù)路由
響應(yīng)式數(shù)據(jù)流
什么是 Reactor?
支持響應(yīng)式背壓
微服務(wù)
組合操作
使用 Stream 和 Promise(約定) 協(xié)調(diào)任務(wù)
Reactor 介紹
分區(qū)
發(fā)布訂閱模型
錯(cuò)誤處理
分析
核心概述
創(chuàng)建非阻塞服務(wù)
使用緩沖區(qū)
請(qǐng)求應(yīng)答模式
關(guān)于該項(xiàng)目
理解線程模型
reactor-核心
響應(yīng)式擴(kuò)展
Streams 的基礎(chǔ)知識(shí)
構(gòu)架總覽
Rx 之外的其它 API
注冊(cè)表
使用窗口
使用前提

組合操作

為了協(xié)調(diào)數(shù)據(jù)的并行序列,我們可以組合發(fā)布者。由于生成序列是合并的結(jié)果,它們也可以用于數(shù)據(jù)的[異步轉(zhuǎn)換]異步轉(zhuǎn)換。

通過(guò)非阻塞的協(xié)調(diào)方式可以避免開發(fā)者使用 Future.get() 或 Promise.await(),這兩個(gè)方法在多信號(hào)存在是容易引發(fā)問(wèn)題。非阻塞意味著管道除了訂閱者的需求,不會(huì)做任何等待。訂閱者的請(qǐng)求將被切分至最小,然后分配給已經(jīng)組合的發(fā)布者。

合并行為在 FanInAction 中建模,并通過(guò)一個(gè)訂閱者委托的線程偷取型 SerializedSubscriber 代理處理并行信號(hào)。它將對(duì)校驗(yàn)每個(gè)信號(hào),查看對(duì)應(yīng)的委托訂閱者是否已經(jīng)運(yùn)行,如果沒(méi)有運(yùn)行,則重新分配信號(hào)。當(dāng)繁忙的線程關(guān)閉訂閱者代碼時(shí),信號(hào)將被輪詢,處理信號(hào)的線程很可能已經(jīng)不再是生產(chǎn)它的那個(gè)了。

在使用 flatMap 之間就削減需求信號(hào)量 沒(méi)法說(shuō)是好主意還是壞主意。實(shí)際上,如果無(wú)法處理所有的數(shù)據(jù),是沒(méi)有必要訂閱多個(gè)并行發(fā)布者并合并操作的。然而它對(duì)并行發(fā)布者數(shù)據(jù)量的限制,也不會(huì)給予高速發(fā)布者掛起請(qǐng)求的機(jī)會(huì)。

Stream.zipWith(Function)

Streams
  .range(1, 100)
  .zipWith( Streams.generate(System::currentTimeMillis), tuple -> tuple ) //1
  .consume(
    tuple -> System.out.println("number: "+tuple.getT1()+" time: "+tuple.getT2()) , //2
    Throwable::printStackTrace,
    avoid -> System.out.println("--complete--")
  );
  1. “Zip” 或聚合來(lái)自 RangeStream 的最新的信號(hào),傳遞 SupplierStream 以提供當(dāng)前時(shí)間。
  2. 通過(guò) “Zip” 操作,壓縮發(fā)布者按照聲明的順序(自左及右,stream1.zipWith(stream2))生成數(shù)據(jù)元組。

表13,組合數(shù)據(jù)源

http://wiki.jikexueyuan.com/project/reactor-2.0/images/33.png" alt="" />