EventBus
發(fā)布和響應(yīng)事件使用的是請(qǐng)求應(yīng)答模式。
常見(jiàn)的情景是,你希望能夠從運(yùn)行在 EventBus 配置好的調(diào)度器(Dispatcher)
中的任務(wù)里獲取應(yīng)答。Reactor 的 EventBus
提供了比簡(jiǎn)單的發(fā)布訂閱模型更全面的事件處理模型。除了 Cunsumer
,你也可以同樣注冊(cè)一個(gè)函數(shù),EventBus
會(huì)自動(dòng)將 Function
的返回值推送給 replyTo
主鍵中的主題。在這里,推薦使用 .receive() 和
.send()
方法,而不是 .on()
和.notify()
方法。
請(qǐng)求應(yīng)答
EventBus bus;
bus.on($("reply.sink"), ev -> {
System.out.printf("Got %s on thread %s%n", ev, Thread.currentThread())
});
bus.receive($("job.sink"), ev -> {
return doWork(ev);
});
bus.send("job.sink", Event.wrap("Hello World!", "reply.sink"));
consumer
,不進(jìn)行任何分析。Dispatcher
線(xiàn)程的 Function
,完成工作并返回結(jié)果。 replyTo
主鍵在總線(xiàn)中發(fā)布Event
。如果沒(méi)有一個(gè)發(fā)布應(yīng)答的通用主題,你可以將請(qǐng)求和應(yīng)答的操作綁定到一個(gè)單獨(dú)的對(duì) .sendAndReceive(Object, Event<?>, Consumer<Event<?>>)
方法的調(diào)用中。此方法將調(diào)用 .send()
,并在函數(shù)被調(diào)用時(shí)在 Dispatcher
線(xiàn)程調(diào)用給定的 replyTo
回調(diào)函數(shù)。
sendAndReceive()
EventBus bus;
bus.receive($("job.sink"), (Event<String> ev) -> {
return ev.getData().toUpperCase();
});
bus.sendAndReceive(
"job.sink",
Event.wrap("Hello World!"),
s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread())
);
Dispatcher
線(xiàn)程完成工作并返回結(jié)果的 Function
。Event
,并在 Dispatcher
中安排給定的 replyTo Consumer
,將接收事件的函數(shù)的返回值作為輸入傳遞給它。取消任務(wù)
有時(shí)候你希望取消一個(gè)任務(wù),停止響應(yīng)事件通知。注冊(cè)函數(shù).on()
和 .receive()
將返回一個(gè) Registration
對(duì)象,如果持有該對(duì)象的引用,你可以用它取消給定Selector
的 Consumer
或 Function
。
EventBus bus;
Registration reg = bus.on($("topic"),
s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread()));
bus.notify("topic", Event.wrap("Hello World!"));
// ...some time later...
reg.cancel();
// ...some time later...
bus.notify("topic", Event.wrap("Hello World!"));
Event.toString()
。Registration
對(duì)象的注冊(cè),組織消息抵達(dá)Consumer
。牢記,取消一個(gè)
Registration
的注冊(cè)將對(duì)內(nèi)部注冊(cè)表進(jìn)行原子訪(fǎng)問(wèn)。當(dāng)系統(tǒng)中存在大量流向消費(fèi)者的時(shí)間時(shí),有時(shí)在你的.cancel()
調(diào)用完成后 注冊(cè)表(Registry)
清理緩存并移除Registration
前,你的Consumer
或Function
依然會(huì)接收到一些事件。.cancel()
方法可以被稱(chēng)為:"請(qǐng)求盡快的取消"。 在測(cè)試類(lèi)中你能夠察覺(jué)這一行為特征,測(cè)試類(lèi)中在.on()、.notify()
和.cancel()
的調(diào)用之間沒(méi)有任何時(shí)間延遲。