鍍金池/ 教程/ 數(shù)據(jù)庫(kù)/ 使用非 JVM 語(yǔ)言開發(fā)
基礎(chǔ)知識(shí)
Bolts
事務(wù)性拓?fù)?/span>
附錄 A
使用非 JVM 語(yǔ)言開發(fā)
一個(gè)實(shí)際的例子
拓?fù)?/span>
準(zhǔn)備開始
附錄 C
Spouts
附錄 B

使用非 JVM 語(yǔ)言開發(fā)

有時(shí)候你可能想使用不是基于 JVM 的語(yǔ)言開發(fā)一個(gè) Storm 工程,你可能更喜歡使用別的語(yǔ)言或者想使用用某種語(yǔ)言編寫的庫(kù)。

Storm 是用 Java 實(shí)現(xiàn)的,你看到的所有這本書中的 spout 和 bolt 都是用 java 編寫的。那么有可能使用像 Python、Ruby、 或者 JavaScript 這樣的語(yǔ)言編寫 spout 和 bolt 嗎?答案是當(dāng)然

可以!可以使用多語(yǔ)言協(xié)議達(dá)到這一目的。

多語(yǔ)言協(xié)議是 Storm 實(shí)現(xiàn)的一種特殊的協(xié)議,它使用標(biāo)準(zhǔn)輸入輸出作為 spout 和 bolt 進(jìn)程間的通訊通道。消息以 JSON 格式或純文本格式在通道中傳遞。

我們看一個(gè)用非 JVM 語(yǔ)言開發(fā) spout 和 bolt 的簡(jiǎn)單例子。在這個(gè)例子中有一個(gè) spout 產(chǎn)生從1到10,000的數(shù)字,一個(gè) bolt 過(guò)濾素?cái)?shù),二者都用 PHP 實(shí)現(xiàn)。

NOTE: 在這個(gè)例子中,我們使用一個(gè)很笨的辦法驗(yàn)證素?cái)?shù)。有更好當(dāng)然也更復(fù)雜的方法,它們已經(jīng)超出了這個(gè)例子的范圍。

有一個(gè)專門為 Storm 實(shí)現(xiàn)的 PHP DSL (譯者注:領(lǐng)域特定語(yǔ)言),我們將會(huì)在例子中展示我們的實(shí)現(xiàn)。首先定義拓?fù)洹?/p>

1
...
2
TopologyBuilder builder = new TopologyBuilder();
3
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
4
builder.setBolt("prime-numbers-filter", new
5
PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
6
StormTopology topology = builder.createTopology();
7
...  

NOTE:有一種使用非 JVM 語(yǔ)言定義拓?fù)涞姆绞?。既?Storm 拓?fù)涫?Thrift 架構(gòu),而且Nimbus 是一個(gè) Thrift 守護(hù)進(jìn)程,你就可以使用任何你想用的語(yǔ)言創(chuàng)建并提交拓?fù)?。但是這已經(jīng)超出了本書的范疇了。

這里沒(méi)什么新鮮了。我們看一下 NumbersGeneratorSpout 的實(shí)現(xiàn)。

01
public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
02
    public NumberGeneratorSpout(Integer from, Integer to) {
03
       super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());
04
    }
05
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
06
        declarer.declare(new Fields("number"));
07
    }
08
    public Map<String, Object> getComponentConfiguration() {
09
        return null;
10
    }
11
}  

你可能已經(jīng)注意到了,這個(gè) spout 繼承了 ShellSpout。 這是個(gè)由 Storm 提供的特殊的類,用來(lái)幫助你運(yùn)行并控制用其它語(yǔ)言編寫的 spout。 在這種情況下它告訴 Storm 如何執(zhí)行你的PHP 腳本。

NumberGeneratorSpout 的 PHP 腳本向標(biāo)準(zhǔn)輸出分發(fā)元組,并從標(biāo)準(zhǔn)輸入讀取確認(rèn)或失敗信號(hào)。

在開始實(shí)現(xiàn) NumberGeneratorSpout.php 腳本之前,多觀察一下多語(yǔ)言協(xié)議是如何工作的。

spout 按照傳遞給構(gòu)造器的參數(shù)從 fromto 順序生成數(shù)字。

接下來(lái)看看 PrimeNumbersFilterBolt。 這個(gè)類實(shí)現(xiàn)了之前提到的殼。它告訴 Storm 如何執(zhí)行你的PHP腳本。 Storm 為這一目的提供了一個(gè)特殊的叫做 ShellBolt 的類,你惟一要做的事就是指出如何運(yùn)行腳本以及聲明要分發(fā)的屬性。

1
public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
2
    public PrimeNumbersFilterBolt() {
3
        super("php", "-f", "PrimeNumbersFilterBolt.php");
4
    }
5
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
6
        declarer.declare(new Fields("number"));
7
    }
8
}  

在這個(gè)構(gòu)造器中只是告訴 Storm 如何運(yùn)行PHP腳本。它與下列命令等價(jià)。

1
php -f PrimeNumbersFilterBolt.php  

PrimeNumbersFilterBolt.php 腳本從標(biāo)準(zhǔn)輸入讀取元組,處理它們,然后向標(biāo)準(zhǔn)輸出分發(fā)、確認(rèn)或失敗。在開始這個(gè)腳本之前,我們先多了解一些多語(yǔ)言協(xié)議的工作方式。

  1. 發(fā)起一次握手
  2. 開始循環(huán)
  3. 讀/寫元組

NOTE:有一種特殊的方式可以使用 Storm 的內(nèi)建日志機(jī)制在你的腳本中記錄日志,所以你不需要自己實(shí)現(xiàn)日志系統(tǒng)。

下面我們來(lái)看一看上述每一步的細(xì)節(jié),以及如何用 PHP 實(shí)現(xiàn)它。

發(fā)起握手

為了控制整個(gè)流程(開始以及結(jié)束它),Storm 需要知道它執(zhí)行的腳本進(jìn)程號(hào)(PID)。根據(jù)多語(yǔ)言協(xié)議,你的進(jìn)程開始時(shí)發(fā)生的第一件事就是 Storm 要向標(biāo)準(zhǔn)輸入(譯者注:根據(jù)上下文理解,本章提到的標(biāo)準(zhǔn)輸入輸出都是從非 JVM 語(yǔ)言的角度理解的,這里提到的標(biāo)準(zhǔn)輸入也就是 PHP 的標(biāo)準(zhǔn)輸入)發(fā)送一段 JSON 數(shù)據(jù),它包含 Storm 配置、拓?fù)渖舷挛暮鸵粋€(gè)進(jìn)程號(hào)目錄。它看起來(lái)就像下面的樣子:

{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt"
        },
        "taskid": 3
    },
    "pidDir": "..."
}  

腳本進(jìn)程必須在 pidDir 指定的目錄下以自己的進(jìn)程號(hào)為名字創(chuàng)建一個(gè)文件,并以 JSON 格式把進(jìn)程號(hào)寫到標(biāo)準(zhǔn)輸出。

{"pid": 1234} 舉個(gè)例子,如果你收到 /tmp/example\n 而你的腳本進(jìn)程號(hào)是123,你應(yīng)該創(chuàng)建一個(gè)名為 /tmp/example/123 的空文件并向標(biāo)準(zhǔn)輸出打印文本行 {“pid”: 123}\n(譯者注:此處原文只有一個(gè) n,譯者猜測(cè)應(yīng)是排版錯(cuò)誤)和 end\n。 這樣 Storm 就能持續(xù)追蹤進(jìn)程號(hào)并在它關(guān)閉時(shí)殺死腳本進(jìn)程。下面是 PHP 實(shí)現(xiàn):

1
$config = json_decode(read_msg(), true);
2
$heartbeatdir = $config['pidDir'];
3
$pid = getmypid();
4
fclose(fopen("$heartbeatdir/$pid", "w"));
5
storm_send(["pid"=>$pid]);
6
flush();  

你已經(jīng)實(shí)現(xiàn)了一個(gè)叫做 read_msg 的函數(shù),用來(lái)處理從標(biāo)準(zhǔn)輸入讀取的消息。按照多語(yǔ)言協(xié)議的聲明,消息可以是單行或多行 JSON 文本。一條消息以 end\n 結(jié)束。

01
function read_msg() {
02
    $msg = "";
03
    while(true) {
04
        $l = fgets(STDIN);
05
        $line = substr($l,0,-1);
06
        if($line=="end") {
07
            break;
08
        }
09
        $msg = "$msg$line\n";
10
    }
11
    return substr($msg, 0, -1);
12
}
13
function storm_send($json) {
14
    write_line(json_encode($json));
15
    write_line("end");
16
}
17
function write_line($line) {
18
    echo("$line\n");
19
}  

NOTE:flush() 方法非常重要;有可能字符緩沖只有在積累到一定程度時(shí)才會(huì)清空。這意味著你的腳本可能會(huì)為了等待一個(gè)來(lái)自 Storm 的輸入而永遠(yuǎn)掛起,而 Storm 卻在等待來(lái)自你的腳本的輸出。因此當(dāng)你的腳本有內(nèi)容輸出時(shí)立即清空緩沖是很重要的。

開始循環(huán)以及讀/寫元組

這是整個(gè)工作中最重要的一步。這一步的實(shí)現(xiàn)取決于你開發(fā)的 spout 和 bolt。

如果是 spout,你應(yīng)當(dāng)開始分發(fā)元組。如果是 bolt,就循環(huán)讀取元組,處理它們,分發(fā)它發(fā),確認(rèn)成功或失敗。

下面我們就看看用來(lái)分發(fā)數(shù)字的 spout。

01
$from = intval($argv[1]);
02
$to = intval($argv[2]);
03
while(true) {
04
    $msg = read_msg();
05
    $cmd = json_decode($msg, true);
06
    if ($cmd['command']=='next') {
07
        if ($from<$to) {
08
            storm_emit(array("$from"));
09
            $task_ids = read_msg();
10
            $from++;
11
        } else {
12
            sleep(1);
13
        }
14
    }
15
    storm_sync();
16
}  

從命令行獲取參數(shù) fromto,并開始迭代。每次從 Storm 得到一條 next 消息,這意味著你已準(zhǔn)備好分發(fā)下一個(gè)元組。

一旦你發(fā)送了所有的數(shù)字,而且沒(méi)有更多元組可發(fā)了,就休眠一段時(shí)間。

為了確保腳本已準(zhǔn)備好發(fā)送下一個(gè)元組,Storm 會(huì)在發(fā)送下一條之前等待 sync\n 文本行。調(diào)用 read_msg(),讀取一條命令,解析 JSON。

對(duì)于 bolts 來(lái)說(shuō),有少許不同。

01
while(true) {
02
    $msg = read_msg();
03
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
04
    if (!empty($tuple["id"])) {
05
        if (isPrime($tuple["tuple"][0])) {
06
            storm_emit(array($tuple["tuple"][0]));
07
        }
08
        storm_ack($tuple["id"]);
09
    }
10
}  

循環(huán)的從標(biāo)準(zhǔn)輸入讀取元組。解析讀取每一條 JSON 消息,判斷它是不是一個(gè)元組,如果是,再檢查它是不是一個(gè)素?cái)?shù),如果是素?cái)?shù)再次分發(fā)一個(gè)元組,否則就忽略掉,最后不論如何都要確認(rèn)成功。

NOTE:在 json_decode 函數(shù)中使用的 JSON_BIGINT_AS_STRING 是為了解決一個(gè)在JAVA 和 PHP 之間的數(shù)據(jù)轉(zhuǎn)換問(wèn)題。JAVA發(fā)送的一些很大的數(shù)字,在 PHP 中會(huì)丟失精度,這樣就會(huì)導(dǎo)致問(wèn)題。為了避開這個(gè)問(wèn)題,告訴PHP把大數(shù)字當(dāng)作字符串處理,并在 JSON 消息中輸出數(shù)字時(shí)不使用雙引號(hào)。PHP5.4.0 或更高版本要求使用這個(gè)參數(shù)。

emit,ack,fail,以及 log 消息都是如下結(jié)構(gòu):

emit

{
    "command": "emit",
    "tuple": ["foo", "bar"]
}  

其中的數(shù)組包含了你分發(fā)的元組數(shù)據(jù)。

ack

{
    "command": "ack",
    "id": 123456789
}  

其中的 id 就是你處理的元組的 ID。

fail

{
    "command": "fail",
    "id": 123456789
}   

ack(譯者注:原文是 emit 從上下 JSON 的內(nèi)容和每個(gè)方法的功能上判斷此處就是 ack,可能是排版錯(cuò)誤)相同,其中 id 就是你處理的元組 ID。

log

{
    "command": "log",
    "msg": "some message to be logged by storm."
}   

下面是完整的的 PHP 代碼。

001
//你的spout:
002
<?php
003
function read_msg() {
004
    $msg = "";
005
    while(true) {
006
        $l = fgets(STDIN);
007
        $line = substr($l,0,-1);
008
        if ($line=="end") {
009
            break;
010
        }
011
        $msg = "$msg$line\n";
012
    }
013
    return substr($msg, 0, -1);
014
}
015
function write_line($line) {
016
    echo("$line\n");
017
}
018
function storm_emit($tuple) {
019
    $msg = array("command" => "emit", "tuple" => $tuple);
020
    storm_send($msg);
021
}
022
function storm_send($json) {
023
    write_line(json_encode($json));
024
    write_line("end");
025
}
026
function storm_sync() {
027
    storm_send(array("command" => "sync"));
028
}
029
function storm_log($msg) {
030
    $msg = array("command" => "log", "msg" => $msg);
031
    storm_send($msg);
032
    flush();
033
}
034
$config = json_decode(read_msg(), true);
035
$heartbeatdir = $config['pidDir'];
036
$pid = getmypid();
037
fclose(fopen("$heartbeatdir/$pid", "w"));
038
storm_send(["pid"=>$pid]);
039
flush();
040
$from = intval($argv[1]);
041
$to = intval($argv[2]);
042
while(true) {
043
    $msg = read_msg();
044
    $cmd = json_decode($msg, true);
045
    if ($cmd['command']=='next') {
046
        if ($from<$to) {
047
            storm_emit(array("$from"));
048
            $task_ids = read_msg();
049
            $from++;
050
        } else {
051
            sleep(1);
052
        }
053
    }
054
    storm_sync();
055
}
056
?>
057
//你的bolt:
058
<?php
059
function isPrime($number) {
060
    if ($number < 2) {
061
        return false;
062
    }
063
    if ($number==2) {
064
        return true;
065
    }
066
    for ($i=2; $i<=$number-1; $i++) {
067
        if ($number % $i == 0) {
068
            return false;
069
        }
070
    }
071
    return true;
072
}
073
function read_msg() {
074
    $msg = "";
075
    while(true) {
076
        $l = fgets(STDIN);
077
        $line = substr($l,0,-1);
078
        if ($line=="end") {
079
            break;
080
        }
081
        $msg = "$msg$line\n";
082
    }
083
    return substr($msg, 0, -1);
084
}
085
function write_line($line) {
086
    echo("$line\n");
087
}
088
function storm_emit($tuple) {
089
    $msg = array("command" => "emit", "tuple" => $tuple);
090
    storm_send($msg);
091
}
092
function storm_send($json) {
093
    write_line(json_encode($json));
094
    write_line("end");
095
}
096
function storm_ack($id) {
097
    storm_send(["command"=>"ack", "id"=>"$id"]);
098
}
099
function storm_log($msg) {
100
    $msg = array("command" => "log", "msg" => "$msg");
101
    storm_send($msg);
102
}
103
$config = json_decode(read_msg(), true);
104
$heartbeatdir = $config['pidDir'];
105
$pid = getmypid();
106
fclose(fopen("$heartbeatdir/$pid", "w"));
107
storm_send(["pid"=>$pid]);
108
flush();
109
while(true) {
110
    $msg = read_msg();
111
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
112
    if (!empty($tuple["id"])) {
113
        if (isPrime($tuple["tuple"][0])) {
114
            storm_emit(array($tuple["tuple"][0]));
115
        }
116
        storm_ack($tuple["id"]);
117
    }
118
}
119
?>  

NOTE:需要重點(diǎn)指出的是,應(yīng)當(dāng)把所有的腳本文件保存在你的工程目錄下的一個(gè)名為multilang/resources 的子目錄中。這個(gè)子目錄被包含在發(fā)送給工人進(jìn)程的 jar 文件中。如果你不把腳本包含在這個(gè)目錄中,Storm 就不能運(yùn)行它們,并拋出一個(gè)錯(cuò)誤。

上一篇:Spouts下一篇:Bolts