鍍金池/ 教程/ 數(shù)據(jù)庫/ 一個實際的例子
基礎(chǔ)知識
Bolts
事務(wù)性拓?fù)?/span>
附錄 A
使用非 JVM 語言開發(fā)
一個實際的例子
拓?fù)?/span>
準(zhǔn)備開始
附錄 C
Spouts
附錄 B

一個實際的例子

本章要闡述一個典型的網(wǎng)絡(luò)分析解決方案,而這類問題通常利用 Hadoop 批處理作為解決方案。與 Hadoop 不同的是,基于 Storm 的方案會實時輸出結(jié)果。

我們的這個例子有三個主要組件

  • 一個基于 Node.js 的 web 應(yīng)用,用于測試系統(tǒng)
  • 一個 Redis 服務(wù)器,用于持久化數(shù)據(jù)
  • 一個 Storm 拓?fù)洌糜诜植际綄崟r處理數(shù)據(jù)

http://wiki.jikexueyuan.com/project/storm/images/09.png" alt="" />

圖 架構(gòu)概覽

NOTE:你如果想先把這個例子運(yùn)行起來,請首先閱讀附錄C

基于 Node.js 的 web 應(yīng)用

我們已經(jīng)偽造了簡單的電子商務(wù)網(wǎng)站。這個網(wǎng)站只有三個頁面:一個主頁、一個產(chǎn)品頁和一個產(chǎn)品統(tǒng)計頁面。這個應(yīng)用基于 ExpressSocket.io 兩個框架實現(xiàn)了向瀏覽器推送內(nèi)容更新。制作這個應(yīng)用的目的是為了讓你體驗 Storm 集群功能并看到處理結(jié)果,但它不是本書的重點,所以我們不會對它的頁面做更詳細(xì)描述。

主頁

這個頁面提供了全部有效產(chǎn)品的鏈接。它從Redis服務(wù)器獲取數(shù)據(jù)并在頁面上把它們顯示出來。這個頁面的URL是http://localhost:3000/。

有效產(chǎn)品:

DVD 播放器(帶環(huán)繞立體聲系統(tǒng))

全高清藍(lán)光 dvd 播放器

媒體播放器(帶 USB 2.0 接口)

全高清攝像機(jī)

防水高清攝像機(jī)

防震防水高清攝像機(jī)

反射式攝像機(jī)

雙核安卓智能手機(jī)(帶 64GB SD卡)

普通移動電話

衛(wèi)星電話

64GB SD 卡

32GB SD 卡

16GB SD 卡

粉紅色智能手機(jī)殼

黑色智能手機(jī)殼

小山羊皮智能手機(jī)殼

產(chǎn)品頁

產(chǎn)品頁用來顯示指定產(chǎn)品的相關(guān)信息,例如,價格、名稱、分類。這個頁面的URL是:http://localhost:3000/product/:id。

產(chǎn)品頁:32英寸液晶電視

分類:電視機(jī)

價格:400

相關(guān)分類

產(chǎn)品統(tǒng)計頁

這個頁面顯示通過收集用戶瀏覽站點,用Storm集群計算的統(tǒng)計信息??梢燥@示為如下概要:瀏覽這個產(chǎn)品的用戶,在那些分類下面瀏覽了n次產(chǎn)品。該頁的URL是:http://localhost:3000/product/:id/stats。

瀏覽了該產(chǎn)品的用戶也瀏覽了以下分類的產(chǎn)品:

  1. 攝像機(jī)

  2. 播放器

  3. 手機(jī)殼

  4. 存儲卡

啟動這個 Node.js web 應(yīng)用

首先啟動 Redis 服務(wù)器,然后執(zhí)行如下命令啟動 web 應(yīng)用:

    node webapp/app.js  

為了向你演示,這個應(yīng)用會自動向 Redis 填充一些產(chǎn)品數(shù)據(jù)作為樣本。

Storm 拓?fù)?/h2>

為這個系統(tǒng)搭建 Storm 拓?fù)涞哪繕?biāo)是改進(jìn)產(chǎn)品統(tǒng)計的實時性。產(chǎn)品統(tǒng)計頁顯示了一個分類計數(shù)器列表,用來顯示訪問了其它同類產(chǎn)品的用戶數(shù)。這樣可以幫助賣家了解他們的用戶需求。拓?fù)浣邮諡g覽日志,并更新產(chǎn)品統(tǒng)計結(jié)果

http://wiki.jikexueyuan.com/project/storm/images/10.png" alt="" />

圖 Storm 拓?fù)涞妮斎肱c輸出

我們的 Storm 拓?fù)溆形鍌€組件:一個 spout 向拓?fù)涮峁?shù)據(jù),四個 bolt 完成統(tǒng)計任務(wù)。

UsersNavigationSpout

從用戶瀏覽數(shù)據(jù)隊列讀取數(shù)據(jù)發(fā)送給拓?fù)?/p>

GetCategoryBolt

從Redis服務(wù)器讀取產(chǎn)品信息,向數(shù)據(jù)流添加產(chǎn)品分類

UserHistoryBolt

讀取用戶以前的產(chǎn)品瀏覽記錄,向下一步分發(fā)Product:Category鍵值對,在下一步更新計數(shù)器

ProductCategoriesCounterBolt

追蹤用戶瀏覽特定分類下的商品次數(shù)

NewsNotifierBolt

通知web應(yīng)用立即更新用戶界面

下圖展示了拓?fù)涞墓ぷ鞣绞剑ㄒ妶D6-6)

package storm.analytics;
...
public class TopologyStarter {
    public static void main(String[] args) {
        Logger.getRootLogger().removeAllAppenders();
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("read-feed", new UsersNavigationSpout(),3);
        builder.setBolt("get-categ", new GetCategoryBolt(),3)
               .shuffleGrouping("read-feed");
        builder.setBolt("user-history", new UserHistoryBolt(),5)
               .fieldsGrouping("get-categ", new Fields("user"));
        builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(),5)
               .fieldsGrouping("user-history", new Fields("product"));
        builder.setBolt("news-notifier", new NewsNotifierBolt(),5)
               .shuffleGrouping("product-categ-counter");

        Config conf = new Config();
        conf.setDebug(true);
        conf.put("redis-host",REDIS_HOST);
        conf.put("redis-port",REDIS_PORT);
        conf.put("webserver", WEBSERVER);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("analytics", conf, builder.createTopology());
    }
}  

http://wiki.jikexueyuan.com/project/storm/images/11.png" alt="" />

Figure Storm拓?fù)?/p>

UsersNavigationSpout

UsersNavigationSpout 負(fù)責(zé)向拓?fù)涮峁g覽數(shù)據(jù)。每條瀏覽數(shù)據(jù)都是一個用戶瀏覽過的產(chǎn)品頁的引用。它們都被 web 應(yīng)用保存在 Redis 服務(wù)器。我們一會兒就要看到更多信息。

NOTE:下面的代碼塊就是相關(guān)代碼。

package storm.analytics;
public class UsersNavigationSpout extends BaseRichSpout {
    Jedis jedis;

    ...

    @Override
    public void nextTuple() {
        String content = jedis.rpop("navigation");
        if(content==null || "nil".equals(content)){
            try { Thread.sleep(300); } catch (InterruptedException e) {}
        } else {
            JSONObject obj=(JSONObject)JSONValue.parse(content);
            String user = obj.get("user").toString();
            String product = obj.get("product").toString();
            String type = obj.get("type").toString();
            HashMap<String, String> map = new HashMap<String, String>();
            map.put("product", product);
            NavigationEntry entry = new NavigationEntry(user, type, map);
            collector.emit(new Values(user, entry));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user", "otherdata"));
    }
}  

spout 首先調(diào)用 jedis.rpop(“navigation”) 從 Redis 刪除并返回 ”navigation” 列表最右邊的元素。如果列表已經(jīng)是空的,就休眠0.3秒,以免使用忙等待循環(huán)阻塞服務(wù)器。如果得到一條數(shù)據(jù)(數(shù)據(jù)是 JSON 格式),就解析它,并創(chuàng)建一個包含該數(shù)據(jù)的 NavigationEntry POJO:

  • 瀏覽頁面的用戶
  • 用戶瀏覽的頁面類型
  • 由頁面類型決定的額外頁面信息?!爱a(chǎn)品”頁的額外信息就是用戶瀏覽的產(chǎn)品 ID。

spout 調(diào)用 collector.emit(new Values(user, entry)) 分發(fā)包含這些信息的元組。這個元組的內(nèi)容是拓?fù)淅锵乱粋€ bolt 的輸入。

GetCategoryBolt

這個 bolt 非常簡單。它只負(fù)責(zé)反序列化前面的 spout 分發(fā)的元組內(nèi)容。如果這是產(chǎn)品頁的數(shù)據(jù),就通過 ProductsReader 類從 Redis 讀取產(chǎn)品信息,然后基于輸入的元組再分發(fā)一個新的包含具體產(chǎn)品信息的元組:

  • 用戶
  • 產(chǎn)品
  • 產(chǎn)品類別
package storm.analytics;

public class GetCategoryBolt extends BaseBasicBolt {
    private ProductReader reader;

    ...
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        NavigationEntry entry = (NavigationEntry)input.getValue(1);
        if("PRODUCT".equals(entry.getPageType())){
            try {
                String product = (String)entry.getOtherData().get("product");

                //調(diào)用產(chǎn)品條目API,得到產(chǎn)品信息
                Product itm = reader.readItem(product);
                if(itm == null) {
                    return;
                }
                String categ = itm.getCategory();
                collector.emit(new Values(entry.getUserId(), product, categ));
            } catch (Exception ex) {
                System.err.println("Error processing PRODUCT tuple"+ ex);
                ex.printStackTrace();
            }
        }
    }
    ...
}  

正如前面所提到的, 使用 ProductsReader 類讀取產(chǎn)品具體信息。

package storm.analytics.utilities;
...
public class ProductReader {
    ...
    public Product readItem(String id) throws Exception{
        String content = jedis.get(id);
        if(content == null || ("nil".equals(content))){
            return null;
        }
        Object obj = JSONValue.parse(content);
        JSONObjectproduct = (JSONObject)obj;
        Product i = new Product((Long)product.get("id"),
                                (String)product.get("title"),
                                (Long)product.get("price"),
                                (String)product.get("category"));
        return i;
    }
    ...
}  

UserHistoryBolt

UserHistoryBolt 是整個應(yīng)用的核心。它負(fù)責(zé)持續(xù)追蹤每個用戶瀏覽過的產(chǎn)品,并決定應(yīng)當(dāng)增加計數(shù)的鍵值對。

我們使用 Redis 保存用戶的產(chǎn)品瀏覽歷史,同時基于性能方面的考慮,還應(yīng)該保留一份本地副本。我們把數(shù)據(jù)訪問細(xì)節(jié)隱藏在方法 getUserNavigationHistory(user)addProductToHistory(user,prodKey) 里,分別用來讀/寫訪問。它們的實現(xiàn)如下

package storm.analytics;
...
public class UserHistoryBolt extends BaseRichBolt{
    @Override
    public void execute(Tuple input) {
        String user = input.getString(0);
        String prod1 = input.getString(1);
        String cat1 = input.getString(2);

        //產(chǎn)品鍵嵌入了產(chǎn)品類別信息
        String prodKey = prod1+":"+cat1;

        Set productsNavigated = getUserNavigationHistory(user);

        //如果用戶以前瀏覽過->忽略它
        if(!productsNavigated.contains(prodKey)) {
            //否則更新相關(guān)條目
            for (String other : productsNavigated) {
                String[] ot = other.split(":");
                String prod2 = ot[0];
                String cat2 = ot[1];
                collector.emit(new Values(prod1, cat2));
                collector.emit(new Values(prod2, cat1));
            }
            addProductToHistory(user, prodKey);
        }
    }
}  

需要注意的是,這個 bolt 的輸出是那些類別計數(shù)應(yīng)當(dāng)獲得增長的產(chǎn)品。

看一看代碼。這個 bolt 維護(hù)著一組被每個用戶瀏覽過的產(chǎn)品。值得注意的是,這個集合包含產(chǎn)品:類別鍵值對,而不是只有產(chǎn)品。這是因為你會在接下來的調(diào)用中用到類別信息,而且這樣也比每次從數(shù)據(jù)庫獲取更高效。這樣做的原因是基于以下考慮,產(chǎn)品可能只有一個類別,而且它在整個產(chǎn)品的生命周期當(dāng)中不會改變。

讀取了用戶以前瀏覽過的產(chǎn)品集合之后(以及它們的類別),檢查當(dāng)前產(chǎn)品以前有沒有被瀏覽過。如果瀏覽過,這條瀏覽數(shù)據(jù)就被忽略了。如果這是首次瀏覽,遍歷用戶瀏覽歷史,并執(zhí)行collector.emit(new Values(prod1,cat2)) 分發(fā)一個元組,這個元組包含當(dāng)前產(chǎn)品和所有瀏覽歷史類別。第二個元組包含所有瀏覽歷史產(chǎn)品和當(dāng)前產(chǎn)品類別,由 collectior.emit(new Values(prod2,cat1))。最后,將當(dāng)前產(chǎn)品和它的類別添加到集合。

比如,假設(shè)用戶 John 有以下瀏覽歷史:

http://wiki.jikexueyuan.com/project/storm/images/12.png" alt="" />

下面是將要處理的瀏覽數(shù)據(jù)

http://wiki.jikexueyuan.com/project/storm/images/13.png" alt="" />

該用戶沒有瀏覽過產(chǎn)品8,因此你需要處理它。

因此要分發(fā)以下元組:

http://wiki.jikexueyuan.com/project/storm/images/14.png" alt="" />

注意,左邊的產(chǎn)品和右邊的類別之間的關(guān)系應(yīng)當(dāng)作為一個整體遞增。

現(xiàn)在,讓我們看看這個 Bolt 用到的持久化實現(xiàn)。

public class UserHistoryBolt extends BaseRichBolt{
    ...
    private Set getUserNavigationHistory(String user) {
        Set userHistory = usersNavigatedItems.get(user);
        if(userHistory == null) {
            userHistory = jedis.smembers(buildKey(user));
            if(userHistory == null)
                userHistory = new HashSet();
            usersNavigatedItems.put(user, userHistory);
        }
        return userHistory;
    }
    private void addProductToHistory(String user, String product) {
        Set userHistory = getUserNavigationHistory(user);
        userHistory.add(product);
        jedis.sadd(buildKey(user), product);
    }
    ...
}  

getUserNavigationHistory 方法返回用戶瀏覽過的產(chǎn)品集。首先,通過usersNavigatedItems.get(user) 方法試圖從本地內(nèi)存得到用戶瀏覽歷史,否則,使用jedis.smembers(buildKey(user)) 從 Redis 服務(wù)器獲取,并把數(shù)據(jù)添加到本地數(shù)據(jù)結(jié)構(gòu)usersNavigatedItems。

當(dāng)用戶瀏覽一個新產(chǎn)品時,調(diào)用 addProductToHistory,通過 userHistory.add(product) 和 jedis.sadd(buildKey(user),product) 同時更新內(nèi)存數(shù)據(jù)結(jié)構(gòu)和 Redis 服務(wù)器。

需要注意的是,當(dāng)你需要做并行化處理時,只要 bolt 在內(nèi)存中維護(hù)著用戶數(shù)據(jù),你就得首先通過用戶做域數(shù)據(jù)流分組(譯者注:原文是 fieldsGrouping,詳細(xì)情況請見第三章的域數(shù)據(jù)流組),這是一件很重要的事情,否則集群內(nèi)將會有用戶瀏覽歷史的多個不同步的副本。

ProductCategoriesCounterBolt

該類持續(xù)追蹤所有的產(chǎn)品-類別關(guān)系。它通過由 UsersHistoryBolt 分發(fā)的產(chǎn)品-類別數(shù)據(jù)對更新計數(shù)。

每個數(shù)據(jù)對的出現(xiàn)次數(shù)保存在 Redis 服務(wù)器?;谛阅芊矫娴目紤],要使用一個本地讀寫緩存,通過一個后臺線程向 Redis 發(fā)送數(shù)據(jù)。

該Bolt會向拓?fù)涞南乱粋€ Bolt ——NewsNotifierBolt——發(fā)送包含最新記數(shù)的元組,這也是最后一個 Bolt,它會向最終用戶廣播實時更新的數(shù)據(jù)。

public class ProductCategoriesCounterBolt extends BaseRichBolt {
    ...
    @Override
    public void execute(){
        String product = input.getString(0);
        String categ = input.getString(1);
        int total = count(product, categ);
        collector.emit(new Values(product, categ, total));
    }
    ...
    private int count(String product, String categ) {
        int count = getProductCategoryCount(categ, product);
        count++;
        storeProductCategoryCount(categ, product, count);
        return count;
    }
    ...
}  

這個 bolt 的持久化工作隱藏在 getProductCategoryCountstoreProductCategoryCount 兩個方法中。它們的具體實現(xiàn)如下:

package storm.analytics;
...
public class ProductCategoriesCounterBolt extends BaseRichBolt {
    // 條目:分類 -> 計數(shù)
    HashMap<String,Integer> counter = new HashMap<String, Integer>();

    //條目:分類 -> 計數(shù)
    HashMap<String,Integer> pendingToSave = new HashMap<String,Integer>();

    ...
    public int getProductCategoryCount(String categ, String product) {
        Integer count = counter.get(buildLocalKey(categ, product));
        if(count == null) {
            String sCount = jedis.hget(buildRedisKey(product), categ);
            if(sCount == null || "nil".equals(sCount)) {
                count = 0;
            } else {
                count = Integer.valueOf(sCount);
            }
        }
        return count;
    }
    ...
    private void storeProductCategoryCount(String categ, String product, int count) {
        String key = buildLocalKey(categ, product);
        counter.put(key, count);
        synchronized (pendingToSave) {
            pendingToSave.put(key, count);
        }
    }
    ...
}  

方法 getProductCategoryCount 首先檢查內(nèi)存緩存計數(shù)器。如果沒有有效令牌,就從 Redis 服務(wù)器取得數(shù)據(jù)。

方法 storeProductCategoryCount 更新計數(shù)器緩存和 pendingToSae 緩沖。緩沖數(shù)據(jù)由下述后臺線程持久化。

package storm.analytics;

public class ProductCategoriesCounterBolt extends BaseRichBolt {
...
    private void startDownloaderThread() {
        TimerTask t = startDownloaderThread() {
            @Override
            public void run () {
                HashMap<String, Integer> pendings;
                synchronized (pendingToSave) {
                    pendings = pendingToSave;
                    pendingToSave = new HashMap<String,Integer>();
                }

                for (String key : pendings.keySet) {
                    String[] keys = key.split(":");
                    String product = keys[0];
                    String categ = keys[1];
                    Integer count = pendings.get(key);
                    jedis.hset(buildRedisKey(product), categ, count.toString());
                }
            }
        };
        timer = new Timer("Item categories downloader");
        timer.scheduleAtFixedRate(t, downloadTime, downloadTime);
    }
    ...
}  

下載線程鎖定 pendingToSave, 向 Redis 發(fā)送數(shù)據(jù)時會為其它線程創(chuàng)建一個新的緩沖。這段代碼每隔 downloadTime 毫秒運(yùn)行一次,這個值可由拓?fù)渑渲脜?shù) download-time 配置。download-time 值越大,寫入 Redis 的次數(shù)就越少,因為一對數(shù)據(jù)的連續(xù)計數(shù)只會向 Redis寫一次。

NewsNotifierBolt

為了讓用戶能夠?qū)崟r查看統(tǒng)計結(jié)果,由 NewsNotifierBolt 負(fù)責(zé)向web應(yīng)用通知統(tǒng)計結(jié)果的變化。通知機(jī)制由 Apache HttpClient 通過 HTTP POST 訪問由拓?fù)渑渲脜?shù)指定的 URL。POST 消息體是 JSON 格式。

測試時把這個 bolt 從拜年中刪除。

01
package storm.analytics;
02
...
03
public class NewsNotifierBolt extends BaseRichBolt {
04
...
05
@Override
06
public void execute(Tuple input) {
07
String product = input.getString(0);
08
String categ = input.getString(1);
09
int visits = input.getInteger(2);</code>
10

11
String content = "{\"product\":\"+product+"\",\"categ\":\""+categ+"\",\"visits\":"+visits+"}";
12
HttpPost post = new HttpPost(webserver);
13
try {
14
post.setEntity(new StringEntity(content));
15
HttpResponse response = client.execute(post);
16
org.apache.http.util.EntityUtils.consume(response.getEntity());
17
} catch (Exception e) {
18
e.printStackTrace();
19
reconnect();
20
}
21
}
22
...
23
}  

Redis 服務(wù)器

Redis 是一種先進(jìn)的、基于內(nèi)存的、支持持久化的鍵值存儲(見http://redis.io)。本例使用它存儲以下信息:

  • 產(chǎn)品信息,用來為 web 站點服務(wù)
  • 用戶瀏覽隊列,用來為 Storm 拓?fù)涮峁?shù)據(jù)
  • Storm 拓?fù)涞闹虚g數(shù)據(jù),用于拓?fù)浒l(fā)生故障時恢復(fù)數(shù)據(jù)
  • Storm 拓?fù)涞奶幚斫Y(jié)果,也就是我們期望得到的結(jié)果。

產(chǎn)品信息

Redis 服務(wù)器以產(chǎn)品 ID 作為鍵,以 JSON 字符串作為值保存著產(chǎn)品信息。

1
redis-cli
2
redis 127.0.0.1:6379&gt; get 15
3
"{\"title\":\"Kids smartphone cover\",\"category\":\"Covers\",\"price\":30,\"id\":
4
15}"  

用戶瀏覽隊列

用戶瀏覽隊列保存在 Redis 中一個鍵為 navigation 的先進(jìn)先出隊列中。用戶瀏覽一個產(chǎn)品頁時,服務(wù)器從隊列左側(cè)添加用戶瀏覽數(shù)據(jù)。Storm 集群不斷的從隊列右側(cè)獲取并移除數(shù)據(jù)。

01
redis 127.0.0.1:6379&gt; llen navigation
02
(integer) 5
03
redis 127.0.0.1:6379&gt; lrange navigation 0 4
04
1) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":
05
\"PRODUCT\"}"
06
2) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":
07
\"PRODUCT\"}"
08
3) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"2\",\"type\":
09
\"PRODUCT\"}"
10
4) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"3\",\"type\":
11
\"PRODUCT\"}"
12
5) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"5\",\"type\":
13
\"PRODUCT\"}"  

中間數(shù)據(jù)

集群需要分開保存每個用戶的歷史數(shù)據(jù)。為了實現(xiàn)這一點,它在 Redis 服務(wù)器上保存著一個包含所有用戶瀏覽過的產(chǎn)品和它們的分類的集合。

1
redis 127.0.0.1:6379&gt; smembers history:59c34159-0ecb-4ef3-a56b-99150346f8d5
2
1) "1:Players"
3
2) "5:Cameras"
4
3) "2:Players"
5
4) "3:Cameras"  

結(jié)果

Storm 集群生成關(guān)于用戶瀏覽的有用數(shù)據(jù),并把它們的產(chǎn)品 ID 保存在一個名為 “prodcnt” 的Redis hash 中。

1
redis 127.0.0.1:6379&gt; hgetall prodcnt:2
2
1) "Players"
3
2) "1"
4
3) "Cameras"
5
4) "2"  

測試拓?fù)?/h3>

使用 LocalCluster 和一個本地 Redis 服務(wù)器執(zhí)行測試。向 Redis 填充產(chǎn)品數(shù)據(jù),偽造訪問日志。我們的斷言會在讀取拓?fù)湎?Redis 輸出的數(shù)據(jù)時執(zhí)行。測試用戶用 java 和 groovy 完成。

http://wiki.jikexueyuan.com/project/storm/images/15.png" alt="" />

測試架構(gòu)

初始化測試

初始化由以下三步組成:

啟動 LocalCluster 并提交拓?fù)?/strong>。初始化在 AbstractAnalyticsTest 實現(xiàn),所有測試用例都繼承該類。當(dāng)初始化多個 AbstractAnalyticsTest 子類對象時,由一個名為topologyStarted 的靜態(tài)標(biāo)志屬性確定初始化工作只會進(jìn)行一次。

需要注意的是,sleep 語句是為了確保在試圖獲取結(jié)果之前 LocalCluster 已經(jīng)正確啟動了。

01
public abstract class AbstractAnalyticsTest extends Assert {
02
 def jedis
03
 static topologyStarted = false
04
 static sync= new Object()
05
 private void reconnect() {
06
 jedis = new Jedis(TopologyStarter.REDIS_HOST, TopologyStarter.REDIS_PORT)
07
 }
08
 @Before
09
 public void startTopology(){
10
 synchronized(sync){
11
 reconnect()
12
 if(!topologyStarted){
13
 jedis.flushAll()
14
 populateProducts()
15
 TopologyStarter.testing = true
16
 TopologyStarter.main(null)
17
 topologyStarted = true
18
 sleep 1000
19
 }
20
 }
21
 }
22
 ...
23
 public void populateProducts() {
24
 def testProducts = [
25
 [id: 0, title:"Dvd player with surround sound system",
26
 category:"Players", price: 100],
27
 [id: 1, title:"Full HD Bluray and DVD player",
28
 category:"Players", price:130],
29
 [id: 2, title:"Media player with USB 2.0 input",
30
 category:"Players", price:70],
31
 ...
32
 [id: 21, title:"TV Wall mount bracket 50-55 Inches",
33
 category:"Mounts", price:80]
34
 ]
35
 testProducts.each() { product -&gt;
36
 def val =
37
 "{ \"title\": \"${product.title}\" , \"category\": \"${product.category}\"," +
38
 " \"price\": ${product.price}, \"id\": ${product.id} }"
39
 println val
40
 jedis.set(product.id.toString(), val.toString())
41
 }
42
 }
43
 ...
44
}  

在 AbstractAnalyticsTest 中實現(xiàn)一個名為 navigate 的方法。為了測試不同的場景,我們要模擬用戶瀏覽站點的行為,這一步向 Redis 的瀏覽隊列(譯者注:就是前文提到的鍵是navigation 的隊列)插入瀏覽數(shù)據(jù)。

01
public abstract class AbstractAnalyticsTest extends Assert {
02
 ...
03
public void navigate(user, product) {
04
 String nav =
05
 "{\"user\": \"${user}\", \"product\": \"${product}\", \"type\": \"PRODUCT\"}".toString()
06
 println "Pushing navigation: ${nav}"
07
 jedis.lpush('navigation', nav)
08
 }
09
 ...
10
}  

實現(xiàn)一個名為 getProductCategoryStats 的方法,用來讀取指定產(chǎn)品與分類的數(shù)據(jù)。不同的測試同樣需要斷言統(tǒng)計結(jié)果,以便檢查拓?fù)涫欠癜凑掌谕哪菢訄?zhí)行了。

01
public abstract class AbstractAnalyticsTest extends Assert {
02
 ...
03
 public int getProductCategoryStats(String product, String categ) {
04
 String count = jedis.hget("prodcnt:${product}", categ)
05
 if(count == null || "nil".equals(count))
06
 return 0
07
 return Integer.valueOf(count)
08
 }
09
 ...
10
}  

一個測試用例

下一步,為用戶“1”模擬一些瀏覽記錄,并檢查結(jié)果。注意執(zhí)行斷言之前要給系統(tǒng)留出兩秒鐘處理數(shù)據(jù)。(記住 ProductCategoryCounterBolt 維護(hù)著一份計數(shù)的本地副本,它是在后臺異步保存到 Redis 的。)

01
package functional
02
class StatsTest extends AbstractAnalyticsTest {
03
 @Test
04
 public void testNoDuplication(){
05
     navigate("1", "0") // Players
06
     navigate("1", "1") // Players
07
     navigate("1", "2") // Players
08
     navigate("1", "3") // Cameras
09
     Thread.sleep(2000) // Give two seconds for the system to process the data.
10
     assertEquals 1, getProductCategoryStats("0", "Cameras")
11
     assertEquals 1, getProductCategoryStats("1", "Cameras")
12
     assertEquals 1, getProductCategoryStats("2", "Cameras")
13
     assertEquals 2, getProductCategoryStats("0", "Players")
14
     assertEquals 3, getProductCategoryStats("3", "Players")
15
 }
16
}  

對可擴(kuò)展性和可用性的提示

為了能在一章的篇幅中講明白整個方案,它已經(jīng)被簡化了。正因如此,一些與可擴(kuò)展性和可用性有關(guān)的必要復(fù)雜性也被去掉了。這方面主要有兩個問題。

Redis 服務(wù)器不只是一個故障的節(jié)點,還是性能瓶頸。你能接收的數(shù)據(jù)最多就是 Redis 能處理的那些。Redis 可以通過分片增強(qiáng)擴(kuò)展性,它的可用性可以通過主從配置得到改進(jìn)。這都需要修改拓?fù)浜?web 應(yīng)用的代碼實現(xiàn)。

另一個缺點就是 web 應(yīng)用不能通過增加服務(wù)器成比例的擴(kuò)展。這是因為當(dāng)產(chǎn)品統(tǒng)計數(shù)據(jù)發(fā)生變化時,需要通知所有關(guān)注它的瀏覽器。這一“通知瀏覽器”的機(jī)制通過 Socket.io 實現(xiàn),但是它要求監(jiān)聽器和通知器在同一主機(jī)上。這一點只有當(dāng) GET /product/:id/stats POST /news 滿足以下條件時才能實現(xiàn),那就是這二者擁有相同的分片標(biāo)準(zhǔn),確保引用相同產(chǎn)品的請求由相同的服務(wù)器處理。

上一篇:準(zhǔn)備開始下一篇:附錄 C