鍍金池/ 教程/ Python/ 使用 python 構(gòu)建基于 hadoop 的 mapreduce 日志分析平臺(tái)
通過(guò) memcached 實(shí)現(xiàn)領(lǐng)號(hào)排隊(duì)功能及 python 隊(duì)列實(shí)例
利用 pypy 提高 python 腳本的執(zhí)行速度及測(cè)試性能
Python FAQ3-python 中 的原始(raw)字符串
Mongodb 千萬(wàn)級(jí)數(shù)據(jù)在 python 下的綜合壓力測(cè)試及應(yīng)用探討
Parallel Python 實(shí)現(xiàn)程序的并行多 cpu 多核利用【pp 模塊】
python simplejson 模塊淺談
服務(wù)端 socket 開(kāi)發(fā)之多線程和 gevent 框架并發(fā)測(cè)試[python 語(yǔ)言]
python Howto 之 logging 模塊
python 之 MySQLdb 庫(kù)的使用
關(guān)于 python 調(diào)用 zabbix api 接口的自動(dòng)化實(shí)例 [結(jié)合 saltstack]
python 之利用 PIL 庫(kù)實(shí)現(xiàn)頁(yè)面的圖片驗(yàn)證碼及縮略圖
Python 通過(guò) amqp 消息隊(duì)列協(xié)議中的 Qpid 實(shí)現(xiàn)數(shù)據(jù)通信
python 中用 string.maketrans 和 translate 巧妙替換字符串
python linecache 模塊讀取文件用法詳解
Python 批量更新 nginx 配置文件
python 計(jì)算文件的行數(shù)和讀取某一行內(nèi)容的實(shí)現(xiàn)方法
python+Django 實(shí)現(xiàn) Nagios 自動(dòng)化添加監(jiān)控項(xiàng)目
多套方案來(lái)提高 python web 框架的并發(fā)處理能力
python 寫(xiě)報(bào)警程序中的聲音實(shí)現(xiàn) winsound
python 調(diào)用 zabbix 的 api 接口添加主機(jī)、查詢組、主機(jī)、模板
對(duì) Python-memcache 分布式散列和調(diào)用的實(shí)現(xiàn)
使用 python 構(gòu)建基于 hadoop 的 mapreduce 日志分析平臺(tái)
一個(gè)腳本講述 python 語(yǔ)言的基礎(chǔ)規(guī)范,適合初學(xué)者
Python 編寫(xiě)的 socket 服務(wù)器和客戶端
如何將 Mac OS X10.9 下的 Python2.7 升級(jí)到最新的 Python3.3
python 監(jiān)控文件或目錄變化
報(bào)警監(jiān)控平臺(tái)擴(kuò)展功能 url 回調(diào)的設(shè)計(jì)及應(yīng)用 [python 語(yǔ)言]
Python 處理 cassandra 升級(jí)后的回滾腳本
python 實(shí)現(xiàn) select 和 epoll 模型 socket 網(wǎng)絡(luò)編程
關(guān)于 B+tree (附 python 模擬代碼)
通過(guò) python 和 websocket 構(gòu)建實(shí)時(shí)通信系統(tǒng)[擴(kuò)展 saltstack 監(jiān)控]

使用 python 構(gòu)建基于 hadoop 的 mapreduce 日志分析平臺(tái)

http://wiki.jikexueyuan.com/project/python-actual-combat/images/48.jpg" alt="pic" />

流量比較大的日志要是直接寫(xiě)入 Hadoop 對(duì) Namenode 負(fù)載過(guò)大,所以入庫(kù)前合并,可以把各個(gè)節(jié)點(diǎn)的日志湊并成一個(gè)文件寫(xiě)入 HDFS。 根據(jù)情況定期合成,寫(xiě)入到 hdfs 里面。

咱們看看日志的大小,200 G 的 dns 日志文件,我壓縮到了 18 G,要是用 awk perl 當(dāng)然也可以,但是處理速度肯定沒(méi)有分布式那樣的給力。

http://wiki.jikexueyuan.com/project/python-actual-combat/images/49.jpg" alt="pic" />

Hadoop Streaming 原理

mapper 和 reducer 會(huì)從標(biāo)準(zhǔn)輸入中讀取用戶數(shù)據(jù),一行一行處理后發(fā)送給標(biāo)準(zhǔn)輸出。Streaming 工具會(huì)創(chuàng)建 MapReduce 作業(yè),發(fā)送給各個(gè) tasktracker,同時(shí)監(jiān)控整個(gè)作業(yè)的執(zhí)行過(guò)程。

任何語(yǔ)言,只要是方便接收標(biāo)準(zhǔn)輸入輸出就可以做 mapreduce~

再搞之前我們先簡(jiǎn)單測(cè)試下 shell 模擬 mapreduce 的性能速度~

http://wiki.jikexueyuan.com/project/python-actual-combat/images/50.jpg" alt="pic" />

看下他的結(jié)果,350 M 的文件用時(shí) 35 秒左右。

http://wiki.jikexueyuan.com/project/python-actual-combat/images/51.jpg" alt="pic" />

這是 2 G 的日志文件,居然用了 3 分鐘。 當(dāng)然和我寫(xiě)的腳本也有問(wèn)題,我們是模擬 mapreduce 的方式,而不是調(diào)用 shell 下牛逼的 awk,gawk 處理。

http://wiki.jikexueyuan.com/project/python-actual-combat/images/52.jpg" alt="pic" />

awk 的速度!果然很霸道,處理日志的時(shí)候,我也很喜歡用 awk,只是學(xué)習(xí)的難度有點(diǎn)大,不像別的 shell 組件那么靈活簡(jiǎn)單。

http://wiki.jikexueyuan.com/project/python-actual-combat/images/1.jpg" alt="pic" />

這是官方的提供的兩個(gè) demo ~

map.py

#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
def read_input(file):
    for line in file:
        # split the line into words
        yield line.split()
def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:
            print '%s%s%d' % (word, separator, 1)
if __name__ == "__main__":
    main()

reduce.py 的修改方式

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)
def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["<current_word>", "<count>"] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass
if __name__ == "__main__":
    main()

咱們?cè)俸?jiǎn)單點(diǎn):

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t%s' % (word, 1)
#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

if current_word == word:
    print '%s\t%s' % (current_word, current_count)

咱們就簡(jiǎn)單模擬下數(shù)據(jù),跑個(gè)測(cè)試

http://wiki.jikexueyuan.com/project/python-actual-combat/images/54.jpg" alt="pic" />

剩下就沒(méi)啥了,在 hadoop 集群環(huán)境下,運(yùn)行 hadoop 的 steaming.jar 組件,加入 mapreduce 的腳本,指定輸出就行了. 下面的例子我用的是 shell 的成分。

[root@101 cron]#$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc

詳細(xì)的參數(shù),對(duì)于咱們來(lái)說(shuō)提供性能可以把 tasks 的任務(wù)數(shù)增加下,根據(jù)情況自己測(cè)試下,也別太高了,增加負(fù)擔(dān)。

(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫(xiě)的 mapper 程序,可以是可執(zhí)行文件或者腳本
(4)-reducer:用戶自己寫(xiě)的 reducer 程序,可以是可執(zhí)行文件或者腳本
(5)-file:打包文件到提交的作業(yè)中,可以是 mapper 或者 reducer 要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的 partitioner 程序
(7)-combiner:用戶自定義的 combiner 程序(必須用 java 實(shí)現(xiàn))
(8)-D:作業(yè)的一些屬性(以前用的是-jonconf),具體有:

1)mapred.map.tasks:map task 數(shù)目
2)mapred.reduce.tasks:reduce task 數(shù)目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task 輸入/輸出數(shù) 據(jù)的分隔符,默認(rèn)均為 \t。
4)stream.num.map.output.key.fields:指定 map task 輸出記錄中 key 所占的域數(shù)目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task 輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為 \t。
6)stream.num.reduce.output.key.fields:指定 reduce task 輸出記錄中 key 所占的域數(shù)目

這里是統(tǒng)計(jì) dns 的日志文件有多少行 ~

http://wiki.jikexueyuan.com/project/python-actual-combat/images/55.jpg" alt="pic" />

在 mapreduce 作為參數(shù)的時(shí)候,不能用太多太復(fù)雜的 shell 語(yǔ)言,他不懂的~

可以寫(xiě)成 shell 文件的模式;

#! /bin/bash
while read LINE; do
#  for word in $LINE
#  do
#    echo "$word 1"
        awk '{print $5}'                                                                                                       
  done
done
#! /bin/bash
count=0
started=0
word=""
while read LINE;do
  goodk=`echo $LINE | cut -d ' '  -f 1`
  if [ "x" == x"$goodk" ];then
     continue
  fi
  if [ "$word" != "$goodk" ];then
    [ $started -ne 0 ] && echo -e "$word\t$count"
    word=$goodk                                                                                                                
    count=1
    started=1
  else
    count=$(( $count + 1 ))
  fi
done

有時(shí)候會(huì)出現(xiàn)這樣的問(wèn)題,好好看看自己寫(xiě)的 mapreduce 程序 ~

13/12/14 13:26:52 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030
13/12/14 13:26:53 INFO streaming.StreamJob:  map 0%  reduce 0%
13/12/14 13:27:16 INFO streaming.StreamJob:  map 100%  reduce 100%
13/12/14 13:27:16 INFO streaming.StreamJob: To kill this job, run:
13/12/14 13:27:16 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201312131904_0030
13/12/14 13:27:16 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030
13/12/14 13:27:16 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201312131904_0030_m_000000
13/12/14 13:27:16 INFO streaming.StreamJob: killJob...
Streaming Command Failed!

python 做為 mapreduce 執(zhí)行成功后,結(jié)果和日志一般是放在你指定的目錄下的,結(jié)果是在 part-00000 文件里面~

http://wiki.jikexueyuan.com/project/python-actual-combat/images/56.jpg" alt="pic" />

下面咱們談下,如何入庫(kù)和后臺(tái)的執(zhí)行

本文出自 “峰云,就她了?!?博客,謝絕轉(zhuǎn)載!