鍍金池/ 教程/ Java/ reactor-核心
異步 TCP、UDP 及 HTTP
微批處理
編解碼器與緩沖區(qū)
組合多個服務(wù)調(diào)用
環(huán)境與調(diào)度者
環(huán)形緩沖區(qū)處理者
Stream 數(shù)據(jù)持久化
函數(shù)式功能
reactor-stream
概述
背壓和溢出
核心處理者
數(shù)據(jù)路由
響應(yīng)式數(shù)據(jù)流
什么是 Reactor?
支持響應(yīng)式背壓
微服務(wù)
組合操作
使用 Stream 和 Promise(約定) 協(xié)調(diào)任務(wù)
Reactor 介紹
分區(qū)
發(fā)布訂閱模型
錯誤處理
分析
核心概述
創(chuàng)建非阻塞服務(wù)
使用緩沖區(qū)
請求應(yīng)答模式
關(guān)于該項目
理解線程模型
reactor-核心
響應(yīng)式擴展
Streams 的基礎(chǔ)知識
構(gòu)架總覽
Rx 之外的其它 API
注冊表
使用窗口
使用前提

reactor-核心

永遠別獨自展開異步工作。
— Jon Brisbin
在寫 Reactor 1 之后

永遠別獨自展開異步工作。
— Stephane Maldini 在寫 Reactor 2 之后

先來看看,某項目是如何使用 Groovy 的:

// 初始化上下文,獲取默認調(diào)度者
Environment.initialize()

// RingBufferDispatcher,默認帶 8192 槽容量
def dispatcher = Environment.sharedDispatcher()

// 創(chuàng)建回調(diào)
Consumer<Integer> c = { data ->
        println "some data arrived: $data"
    }

// 創(chuàng)建 error 回調(diào)

Consumer<Throwable errorHandler = { it.printStackTrace }

// 異步分發(fā)數(shù)據(jù)
dispatcher.dispatch(1234, c, errorHandler)

Environment.terminate()

然后,再看看響應(yīng)式數(shù)據(jù)流例子

// 獨立異步處理者
def processor = RingBufferProcessor.<Integer>create()

// 發(fā)送數(shù)據(jù),確保數(shù)據(jù)的安全性,直到訂閱成功
processor.onNext(1234)
processor.onNext(5678)

// 消費整型數(shù)據(jù)
processor.subscribe(new Subscriber<Integer>(){

  void onSubscribe(Subscription s){
      //unbounded subscriber
      s.request Long.MAX
  }

  void onNext(Integer data){
      println data
  }

  void onError(Throwable err){
      err.printStackTrace()
  }

  void onComplete(){
      println 'done!'
  }
}

// 完全關(guān)閉內(nèi)部線程和調(diào)用
processor.onComplete()
上一篇:理解線程模型下一篇:微批處理