鍍金池/ 教程/ Java/ 請(qǐng)求應(yīng)答模式
異步 TCP、UDP 及 HTTP
微批處理
編解碼器與緩沖區(qū)
組合多個(gè)服務(wù)調(diào)用
環(huán)境與調(diào)度者
環(huán)形緩沖區(qū)處理者
Stream 數(shù)據(jù)持久化
函數(shù)式功能
reactor-stream
概述
背壓和溢出
核心處理者
數(shù)據(jù)路由
響應(yīng)式數(shù)據(jù)流
什么是 Reactor?
支持響應(yīng)式背壓
微服務(wù)
組合操作
使用 Stream 和 Promise(約定) 協(xié)調(diào)任務(wù)
Reactor 介紹
分區(qū)
發(fā)布訂閱模型
錯(cuò)誤處理
分析
核心概述
創(chuàng)建非阻塞服務(wù)
使用緩沖區(qū)
請(qǐng)求應(yīng)答模式
關(guān)于該項(xiàng)目
理解線(xiàn)程模型
reactor-核心
響應(yīng)式擴(kuò)展
Streams 的基礎(chǔ)知識(shí)
構(gòu)架總覽
Rx 之外的其它 API
注冊(cè)表
使用窗口
使用前提

請(qǐng)求應(yīng)答模式

EventBus發(fā)布和響應(yīng)事件使用的是請(qǐng)求應(yīng)答模式。

常見(jiàn)的情景是,你希望能夠從運(yùn)行在 EventBus 配置好的調(diào)度器(Dispatcher)中的任務(wù)里獲取應(yīng)答。Reactor 的 EventBus提供了比簡(jiǎn)單的發(fā)布訂閱模型更全面的事件處理模型。除了 Cunsumer,你也可以同樣注冊(cè)一個(gè)函數(shù),EventBus會(huì)自動(dòng)將 Function的返回值推送給 replyTo 主鍵中的主題。在這里,推薦使用 .receive() 和 .send() 方法,而不是 .on().notify() 方法。

請(qǐng)求應(yīng)答

EventBus bus;

bus.on($("reply.sink"), ev -> {
  System.out.printf("Got %s on thread %s%n", ev, Thread.currentThread())
}); 

bus.receive($("job.sink"), ev -> {
  return doWork(ev);
}); 

bus.send("job.sink", Event.wrap("Hello World!", "reply.sink")); 
  1. 分配一個(gè)處理所有應(yīng)答的 consumer,不進(jìn)行任何分析。
  2. 分配一個(gè)工作在Dispatcher 線(xiàn)程的 Function,完成工作并返回結(jié)果。
  3. 使用給定的 replyTo 主鍵在總線(xiàn)中發(fā)布Event

如果沒(méi)有一個(gè)發(fā)布應(yīng)答的通用主題,你可以將請(qǐng)求和應(yīng)答的操作綁定到一個(gè)單獨(dú)的對(duì) .sendAndReceive(Object, Event<?>, Consumer<Event<?>>)方法的調(diào)用中。此方法將調(diào)用 .send(),并在函數(shù)被調(diào)用時(shí)在 Dispatcher 線(xiàn)程調(diào)用給定的 replyTo回調(diào)函數(shù)。

sendAndReceive()

EventBus bus;

bus.receive($("job.sink"), (Event<String> ev) -> {
  return ev.getData().toUpperCase();
}); 

bus.sendAndReceive(
    "job.sink",
   Event.wrap("Hello World!"),
   s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread())
); 
  1. 分配一個(gè)在 Dispatcher 線(xiàn)程完成工作并返回結(jié)果的 Function。
  2. 在總線(xiàn)中發(fā)布一個(gè)Event,并在 Dispatcher 中安排給定的 replyTo Consumer,將接收事件的函數(shù)的返回值作為輸入傳遞給它。

取消任務(wù)

有時(shí)候你希望取消一個(gè)任務(wù),停止響應(yīng)事件通知。注冊(cè)函數(shù).on().receive()將返回一個(gè) Registration對(duì)象,如果持有該對(duì)象的引用,你可以用它取消給定SelectorConsumerFunction

EventBus bus;

Registration reg = bus.on($("topic"),
                          s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread()));

bus.notify("topic", Event.wrap("Hello World!")); 

// ...some time later...
reg.cancel(); 

// ...some time later...
bus.notify("topic", Event.wrap("Hello World!")); 
  1. 對(duì)給定主題發(fā)布一個(gè)事件,應(yīng)當(dāng)在控制臺(tái)中打印 Event.toString()。
  2. 取消 Registration 對(duì)象的注冊(cè),組織消息抵達(dá)Consumer。
  3. 這個(gè)通知不應(yīng)當(dāng)有任何結(jié)果。

牢記,取消一個(gè)Registration 的注冊(cè)將對(duì)內(nèi)部注冊(cè)表進(jìn)行原子訪(fǎng)問(wèn)。當(dāng)系統(tǒng)中存在大量流向消費(fèi)者的時(shí)間時(shí),有時(shí)在你的.cancel()調(diào)用完成后 注冊(cè)表(Registry) 清理緩存并移除Registration前,你的 ConsumerFunction依然會(huì)接收到一些事件。.cancel()方法可以被稱(chēng)為:"請(qǐng)求盡快的取消"。 在測(cè)試類(lèi)中你能夠察覺(jué)這一行為特征,測(cè)試類(lèi)中在.on()、.notify().cancel() 的調(diào)用之間沒(méi)有任何時(shí)間延遲。