為了協(xié)調(diào)數(shù)據(jù)的并行序列,我們可以組合發(fā)布者
。由于生成序列是合并的結(jié)果,它們也可以用于數(shù)據(jù)的[異步轉(zhuǎn)換]異步轉(zhuǎn)換。
通過(guò)非阻塞的協(xié)調(diào)方式可以避免開發(fā)者使用 Future.get() 或 Promise.await()
,這兩個(gè)方法在多信號(hào)存在是容易引發(fā)問(wèn)題。非阻塞意味著管道除了訂閱者
的需求,不會(huì)做任何等待。訂閱者的請(qǐng)求將被切分至最小,然后分配給已經(jīng)組合的發(fā)布者
。
合并行為在 FanInAction
中建模,并通過(guò)一個(gè)訂閱者委托的線程偷取型 SerializedSubscriber
代理處理并行信號(hào)。它將對(duì)校驗(yàn)每個(gè)信號(hào),查看對(duì)應(yīng)的委托訂閱者是否已經(jīng)運(yùn)行,如果沒(méi)有運(yùn)行,則重新分配信號(hào)。當(dāng)繁忙的線程關(guān)閉訂閱者代碼時(shí),信號(hào)將被輪詢,處理信號(hào)的線程很可能已經(jīng)不再是生產(chǎn)它的那個(gè)了。
在使用
flatMap
之間就削減需求信號(hào)量 沒(méi)法說(shuō)是好主意還是壞主意。實(shí)際上,如果無(wú)法處理所有的數(shù)據(jù),是沒(méi)有必要訂閱多個(gè)并行發(fā)布者并合并操作的。然而它對(duì)并行發(fā)布者
數(shù)據(jù)量的限制,也不會(huì)給予高速發(fā)布者
掛起請(qǐng)求的機(jī)會(huì)。
Stream.zipWith(Function)
Streams
.range(1, 100)
.zipWith( Streams.generate(System::currentTimeMillis), tuple -> tuple ) //1
.consume(
tuple -> System.out.println("number: "+tuple.getT1()+" time: "+tuple.getT2()) , //2
Throwable::printStackTrace,
avoid -> System.out.println("--complete--")
);
RangeStream
的最新的信號(hào),傳遞 SupplierStream
以提供當(dāng)前時(shí)間。發(fā)布者
按照聲明的順序(自左及右,stream1.zipWith(stream2)
)生成數(shù)據(jù)元組。表13,組合數(shù)據(jù)源
http://wiki.jikexueyuan.com/project/reactor-2.0/images/33.png" alt="" />