鍍金池/ 教程/ Python/ 并發(fā)編程
類與對(duì)象
模塊與包
數(shù)據(jù)編碼和處理
元編程
網(wǎng)絡(luò)與 Web 編程
數(shù)字日期和時(shí)間
測(cè)試、調(diào)試和異常
字符串和文本
文件與 IO
腳本編程與系統(tǒng)管理
迭代器與生成器
函數(shù)
C 語(yǔ)言擴(kuò)展
并發(fā)編程
數(shù)據(jù)結(jié)構(gòu)和算法

并發(fā)編程

對(duì)于并發(fā)編程, Python 有多種長(zhǎng)期支持的方法, 包括多線程, 調(diào)用子進(jìn)程, 以及各種各樣的關(guān)于生成器函數(shù)的技巧. 這一章將會(huì)給出并發(fā)編程各種方面的技巧, 包括通用的多線程技術(shù)以及并行計(jì)算的實(shí)現(xiàn)方法.

像經(jīng)驗(yàn)豐富的程序員所知道的那樣, 大家擔(dān)心并發(fā)的程序有潛在的危險(xiǎn). 因此, 本章的主要目標(biāo)之一是給出更加可信賴和易調(diào)試的代碼.

啟動(dòng)與停止線程

問(wèn)題

你要為需要并發(fā)執(zhí)行的代碼創(chuàng)建/銷毀線程

解決方案

threading庫(kù)可以在單獨(dú)的線程中執(zhí)行任何的在 Python 中可以調(diào)用的對(duì)象。你可以創(chuàng)建一個(gè) Thread 對(duì)象并將你要執(zhí)行的對(duì)象以 target 參數(shù)的形式提供給該對(duì)象。 下面是一個(gè)簡(jiǎn)單的例子:

# Code to execute in an independent thread
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()

當(dāng)你創(chuàng)建好一個(gè)線程對(duì)象后,該對(duì)象并不會(huì)立即執(zhí)行,除非你調(diào)用它的 start() 方法(當(dāng)你調(diào)用 start() 方法時(shí),它會(huì)調(diào)用你傳遞進(jìn)來(lái)的函數(shù),并把你傳遞進(jìn)來(lái)的參數(shù)傳遞給該函數(shù))。Python 中的線程會(huì)在一個(gè)單獨(dú)的系統(tǒng)級(jí)線程中執(zhí)行(比如說(shuō)一個(gè) POSIX 線程或者一個(gè) Windows 線程),這些線程將由操作系統(tǒng)來(lái)全權(quán)管理。線程一旦啟動(dòng),將獨(dú)立執(zhí)行直到目標(biāo)函數(shù)返回。你可以查詢一個(gè)線程對(duì)象的狀態(tài),看它是否還在執(zhí)行:

if t.is_alive():
    print('Still running')
else:
    print('Completed')

你也可以將一個(gè)線程加入到當(dāng)前線程,并等待它終止:

t.join()

Python 解釋器在所有線程都終止后才繼續(xù)執(zhí)行代碼剩余的部分。對(duì)于需要長(zhǎng)時(shí)間運(yùn)行的線程或者需要一直運(yùn)行的后臺(tái)任務(wù),你應(yīng)當(dāng)考慮使用后臺(tái)線程。 例如:

t = Thread(target=countdown, args=(10,), daemon=True)
t.start()

后臺(tái)線程無(wú)法等待,不過(guò),這些線程會(huì)在主線程終止時(shí)自動(dòng)銷毀。 除了如上所示的兩個(gè)操作,并沒(méi)有太多可以對(duì)線程做的事情。你無(wú)法結(jié)束一個(gè)線程,無(wú)法給它發(fā)送信號(hào),無(wú)法調(diào)整它的調(diào)度,也無(wú)法執(zhí)行其他高級(jí)操作。如果需要這些特性,你需要自己添加。比如說(shuō),如果你需要終止線程,那么這個(gè)線程必須通過(guò)編程在某個(gè)特定點(diǎn)輪詢來(lái)退出。你可以像下邊這樣把線程放入一個(gè)類中:

class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

    c = CountdownTask()
    t = Thread(target=c.run, args=(10,))
    t.start()
    c.terminate() # Signal termination
    t.join()      # Wait for actual termination (if needed)

如果線程執(zhí)行一些像 I/O 這樣的阻塞操作,那么通過(guò)輪詢來(lái)終止線程將使得線程之間的協(xié)調(diào)變得非常棘手。比如,如果一個(gè)線程一直阻塞在一個(gè) I/O 操作上,它就永遠(yuǎn)無(wú)法返回,也就無(wú)法檢查自己是否已經(jīng)被結(jié)束了。要正確處理這些問(wèn)題,你需要利用超時(shí)循環(huán)來(lái)小心操作線程。 例子如下:

class IOTask:
    def terminate(self):
        self._running = False

    def run(self, sock):
        # sock is a socket
        sock.settimeout(5)        # Set timeout period
        while self._running:
            # Perform a blocking I/O operation w/ timeout
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue
            # Continued processing
            ...
        # Terminated
        return

討論

由于全局解釋鎖(GIL)的原因,Python 的線程被限制到同一時(shí)刻只允許一個(gè)線程執(zhí)行這樣一個(gè)執(zhí)行模型。所以,Python 的線程更適用于處理 I/O 和其他需要并發(fā)執(zhí)行的阻塞操作(比如等待 I/O、等待從數(shù)據(jù)庫(kù)獲取數(shù)據(jù)等等),而不是需要多處理器并行的計(jì)算密集型任務(wù)。

有時(shí)你會(huì)看到下邊這種通過(guò)繼承 Thread 類來(lái)實(shí)現(xiàn)的線程:

from threading import Thread

class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = 0
    def run(self):
        while self.n > 0:

            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)

c = CountdownThread(5)
c.start()

盡管這樣也可以工作,但這使得你的代碼依賴于 threading 庫(kù),所以你的這些代碼只能在線程上下文中使用。上文所寫的那些代碼、函數(shù)都是與 threading 庫(kù)無(wú)關(guān)的,這樣就使得這些代碼可以被用在其他的上下文中,可能與線程有關(guān),也可能與線程無(wú)關(guān)。比如,你可以通過(guò) multiprocessing 模塊在一個(gè)單獨(dú)的進(jìn)程中執(zhí)行你的代碼:

import multiprocessing
c = CountdownTask(5)
p = multiprocessing.Process(target=c.run)
p.start()

再次重申,這段代碼僅適用于 CountdownTask 類是以獨(dú)立于實(shí)際的并發(fā)手段(多線程、多進(jìn)程等等)實(shí)現(xiàn)的情況。

判斷線程是否已經(jīng)啟動(dòng)

問(wèn)題

你已經(jīng)啟動(dòng)了一個(gè)線程,但是你想知道它是不是真的已經(jīng)開始運(yùn)行了。

解決方案

線程的一個(gè)關(guān)鍵特性是每個(gè)線程都是獨(dú)立運(yùn)行且狀態(tài)不可預(yù)測(cè)。如果程序中的其他線程需要通過(guò)判斷某個(gè)線程的狀態(tài)來(lái)確定自己下一步的操作,這時(shí)線程同步問(wèn)題就會(huì)變得非常棘手。為了解決這些問(wèn)題,我們需要使用 threading 庫(kù)中的 Event 對(duì)象。 Event 對(duì)象包含一個(gè)可由線程設(shè)置的信號(hào)標(biāo)志,它允許線程等待某些事件的發(fā)生。在初始情況下,event 對(duì)象中的信號(hào)標(biāo)志被設(shè)置為假。如果有線程等待一個(gè) event 對(duì)象,而這個(gè) event 對(duì)象的標(biāo)志為假,那么這個(gè)線程將會(huì)被一直阻塞直至該標(biāo)志為真。一個(gè)線程如果將一個(gè) event 對(duì)象的信號(hào)標(biāo)志設(shè)置為真,它將喚醒所有等待這個(gè) event 對(duì)象的線程。如果一個(gè)線程等待一個(gè)已經(jīng)被設(shè)置為真的 event 對(duì)象,那么它將忽略這個(gè)事件,繼續(xù)執(zhí)行。 下邊的代碼展示了如何使用 Event 來(lái)協(xié)調(diào)線程的啟動(dòng):

from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create the event object that will be used to signal startup
started_evt = Event()

# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()

# Wait for the thread to start
started_evt.wait()
print('countdown is running')

當(dāng)你執(zhí)行這段代碼,“countdown is running” 總是顯示在 “countdown starting” 之后顯示。這是由于使用 event 來(lái)協(xié)調(diào)線程,使得主線程要等到 countdown() 函數(shù)輸出啟動(dòng)信息后,才能繼續(xù)執(zhí)行。

討論

event 對(duì)象最好單次使用,就是說(shuō),你創(chuàng)建一個(gè) event 對(duì)象,讓某個(gè)線程等待這個(gè)對(duì)象,一旦這個(gè)對(duì)象被設(shè)置為真,你就應(yīng)該丟棄它。盡管可以通過(guò) clear() 方法來(lái)重置 event 對(duì)象,但是很難確保安全地清理 event 對(duì)象并對(duì)它重新賦值。很可能會(huì)發(fā)生錯(cuò)過(guò)事件、死鎖或者其他問(wèn)題(特別是,你無(wú)法保證重置 event 對(duì)象的代碼會(huì)在線程再次等待這個(gè) event 對(duì)象之前執(zhí)行)。如果一個(gè)線程需要不停地重復(fù)使用 event 對(duì)象,你最好使用 Condition 對(duì)象來(lái)代替。下面的代碼使用 Condition 對(duì)象實(shí)現(xiàn)了一個(gè)周期定時(shí)器,每當(dāng)定時(shí)器超時(shí)的時(shí)候,其他線程都可以監(jiān)測(cè)到:

import threading
import time

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()

    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True

        t.start()

    def run(self):
        '''
        Run the timer and notify waiting threads after each interval
        '''
        while True:
            time.sleep(self._interval)
            with self._cv:
                 self._flag ^= 1
                 self._cv.notify_all()

    def wait_for_tick(self):
        '''
        Wait for the next tick of the timer
        '''
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()

# Example use of the timer
ptimer = PeriodicTimer(5)
ptimer.start()

# Two threads that synchronize on the timer
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1

threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

event 對(duì)象的一個(gè)重要特點(diǎn)是當(dāng)它被設(shè)置為真時(shí)會(huì)喚醒所有等待它的線程。如果你只想喚醒單個(gè)線程,最好是使用信號(hào)量或者 Condition 對(duì)象來(lái)替代??紤]一下這段使用信號(hào)量實(shí)現(xiàn)的代碼:

# Worker thread
def worker(n, sema):
    # Wait to be signaled
    sema.acquire()

    # Do some work
    print('Working', n)

# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()

運(yùn)行上邊的代碼將會(huì)啟動(dòng)一個(gè)線程池,但是并沒(méi)有什么事情發(fā)生。這是因?yàn)樗械木€程都在等待獲取信號(hào)量。每次信號(hào)量被釋放,只有一個(gè)線程會(huì)被喚醒并執(zhí)行,示例如下:

>>> sema.release()
Working 0
>>> sema.release()
Working 1
>>>

編寫涉及到大量的線程間同步問(wèn)題的代碼會(huì)讓你痛不欲生。比較合適的方式是使用隊(duì)列來(lái)進(jìn)行線程間通信或者每個(gè)把線程當(dāng)作一個(gè) Actor,利用 Actor 模型來(lái)控制并發(fā)。下一節(jié)將會(huì)介紹到隊(duì)列,而 Actor 模型將在12.10節(jié)介紹。

線程間通信

問(wèn)題

你的程序中有多個(gè)線程,你需要在這些線程之間安全地交換信息或數(shù)據(jù)

解決方案

從一個(gè)線程向另一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫(kù)中的隊(duì)列了。創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對(duì)象,這些線程通過(guò)使用put()get() 操作來(lái)向隊(duì)列中添加或者刪除元素。 例如:

Queue對(duì)象已經(jīng)包含了必要的鎖,所以你可以通過(guò)它在多個(gè)線程間多安全地共享數(shù)據(jù)。 當(dāng)使用隊(duì)列時(shí),協(xié)調(diào)生產(chǎn)者和消費(fèi)者的關(guān)閉問(wèn)題可能會(huì)有一些麻煩。一個(gè)通用的解決方法是在隊(duì)列中放置一個(gè)特殊的只,當(dāng)消費(fèi)者讀到這個(gè)值的時(shí)候,終止執(zhí)行。例如:

from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Process the data
        ...

本例中有一個(gè)特殊的地方:消費(fèi)者在讀到這個(gè)特殊值之后立即又把它放回到隊(duì)列中,將之傳遞下去。這樣,所有監(jiān)聽(tīng)這個(gè)隊(duì)列的消費(fèi)者線程就可以全部關(guān)閉了。 盡管隊(duì)列是最常見(jiàn)的線程間通信機(jī)制,但是仍然可以自己通過(guò)創(chuàng)建自己的數(shù)據(jù)結(jié)構(gòu)并添加所需的鎖和同步機(jī)制來(lái)實(shí)現(xiàn)線程間通信。最常見(jiàn)的方法是使用Condition變量來(lái)包裝你的數(shù)據(jù)結(jié)構(gòu)。下邊這個(gè)例子演示了如何創(chuàng)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列,如同1.5節(jié)中介紹的那樣。

import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

使用隊(duì)列來(lái)進(jìn)行線程間通信是一個(gè)單向、不確定的過(guò)程。通常情況下,你沒(méi)有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。不過(guò)隊(duì)列對(duì)象提供一些基本完成的特性,比如下邊這個(gè)例子中的task_done()join()

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Process the data
        ...
        # Indicate completion
        in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()

如果一個(gè)線程需要在一個(gè)“消費(fèi)者”線程處理完特定的數(shù)據(jù)項(xiàng)時(shí)立即得到通知,你可以把要發(fā)送的數(shù)據(jù)和一個(gè) Event 放到一起使用,這樣“生產(chǎn)者”就可以通過(guò)這個(gè) Event 對(duì)象來(lái)監(jiān)測(cè)處理的過(guò)程了。示例如下:

from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data, evt = in_q.get()
        # Process the data
        ...
        # Indicate completion
        evt.set()

討論

基于簡(jiǎn)單隊(duì)列編寫多線程程序在多數(shù)情況下是一個(gè)比較明智的選擇。從線程安全隊(duì)列的底層實(shí)現(xiàn)來(lái)看,你無(wú)需在你的代碼中使用鎖和其他底層的同步機(jī)制,這些只會(huì)把你的程序弄得亂七八糟。此外,使用隊(duì)列這種基于消息的通信機(jī)制可以被擴(kuò)展到更大的應(yīng)用范疇,比如,你可以把你的程序放入多個(gè)進(jìn)程甚至是分布式系統(tǒng)而無(wú)需改變底層的隊(duì)列結(jié)構(gòu)。 使用線程隊(duì)列有一個(gè)要注意的問(wèn)題是,向隊(duì)列中添加數(shù)據(jù)項(xiàng)時(shí)并不會(huì)復(fù)制此數(shù)據(jù)項(xiàng),線程間通信實(shí)際上是在線程間傳遞對(duì)象引用。如果你擔(dān)心對(duì)象的共享狀態(tài),那你最好只傳遞不可修改的數(shù)據(jù)結(jié)構(gòu)(如:整型、字符串或者元組)或者一個(gè)對(duì)象的深拷貝。例如:

from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...

Queue 對(duì)象提供一些在當(dāng)前上下文很有用的附加特性。比如在創(chuàng)建 Queue 對(duì)象時(shí)提供可選的size 參數(shù)來(lái)限制可以添加到隊(duì)列中的元素?cái)?shù)量。對(duì)于“生產(chǎn)者”與“消費(fèi)者”速度有差異的情況,為隊(duì)列中的元素?cái)?shù)量添加上限是有意義的。比如,一個(gè)“生產(chǎn)者”產(chǎn)生項(xiàng)目的速度比“消費(fèi)者” “消費(fèi)”的速度快,那么使用固定大小的隊(duì)列就可以在隊(duì)列已滿的時(shí)候阻塞隊(duì)列,以免未預(yù)期的連鎖效應(yīng)擴(kuò)散整個(gè)程序造成死鎖或者程序運(yùn)行失常。在通信的線程之間進(jìn)行“流量控制”是一個(gè)看起來(lái)容易實(shí)現(xiàn)起來(lái)困難的問(wèn)題。如果你發(fā)現(xiàn)自己曾經(jīng)試圖通過(guò)擺弄隊(duì)列大小來(lái)解決一個(gè)問(wèn)題,這也許就標(biāo)志著你的程序可能存在脆弱設(shè)計(jì)或者固有的可伸縮問(wèn)題。 get()put()方法都支持非阻塞方式和設(shè)定超時(shí),例如:

import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    q.put(item, block=False)
except queue.Full:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...

這些操作都可以用來(lái)避免當(dāng)執(zhí)行某些特定隊(duì)列操作時(shí)發(fā)生無(wú)限阻塞的情況,比如,一個(gè)非阻塞的put() 方法和一個(gè)固定大小的隊(duì)列一起使用,這樣當(dāng)隊(duì)列已滿時(shí)就可以執(zhí)行不同的代碼。比如輸出一條日志信息并丟棄。

def producer(q):
    ...
    try:
        q.put(item, block=False)
    except queue.Full:
        log.warning('queued item %r discarded!', item)

如果你試圖讓消費(fèi)者線程在執(zhí)行像 q.get()這樣的操作時(shí),超時(shí)自動(dòng)終止以便檢查終止標(biāo)志,你應(yīng)該使用q.get()的可選參數(shù) timeout,如下:

_running = True

def consumer(q):
    while _running:
        try:
            item = q.get(timeout=5.0)
            # Process item
            ...
        except queue.Empty:
            pass

最后,有 q.qsize()q.full(),q.empty() 等實(shí)用方法可以獲取一個(gè)隊(duì)列的當(dāng)前大小和狀態(tài)。但要注意,這些方法都不是線程安全的。可能你對(duì)一個(gè)隊(duì)列使用 empty()判斷出這個(gè)隊(duì)列為空,但同時(shí)另外一個(gè)線程可能已經(jīng)向這個(gè)隊(duì)列中插入一個(gè)數(shù)據(jù)項(xiàng)。所以,你最好不要在你的代碼中使用這些方法。

給關(guān)鍵部分加鎖

問(wèn)題

你需要對(duì)多線程程序中的臨界區(qū)加鎖以避免競(jìng)爭(zhēng)條件。

解決方案

要在多線程程序中安全使用可變對(duì)象,你需要使用 threading 庫(kù)中的 Lock 對(duì)象,就像下邊這個(gè)例子這樣:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with self._value_lock:
             self._value -= delta

Lock 對(duì)象和 with 語(yǔ)句塊一起使用可以保證互斥執(zhí)行,就是每次只有一個(gè)線程可以執(zhí)行 with 語(yǔ)句包含的代碼塊。with 語(yǔ)句會(huì)在這個(gè)代碼塊執(zhí)行前自動(dòng)獲取鎖,在執(zhí)行結(jié)束后自動(dòng)釋放鎖。

討論

線程調(diào)度本質(zhì)上是不確定的,因此,在多線程程序中錯(cuò)誤地使用鎖機(jī)制可能會(huì)導(dǎo)致隨機(jī)數(shù)據(jù)損壞或者其他的異常行為,我們稱之為競(jìng)爭(zhēng)條件。為了避免競(jìng)爭(zhēng)條件,最好只在臨界區(qū)(對(duì)臨界資源進(jìn)行操作的那部分代碼)使用鎖。 在一些“老的” Python 代碼中,顯式獲取和釋放鎖是很常見(jiàn)的。下邊是一個(gè)上一個(gè)例子的變種:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        self._value_lock.acquire()
        self._value += delta
        self._value_lock.release()

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        self._value_lock.acquire()
        self._value -= delta
        self._value_lock.release()

相比于這種顯式調(diào)用的方法,with 語(yǔ)句更加優(yōu)雅,也更不容易出錯(cuò),特別是程序員可能會(huì)忘記調(diào)用 release() 方法或者程序在獲得鎖之后產(chǎn)生異常這兩種情況(使用 with 語(yǔ)句可以保證在這兩種情況下仍能正確釋放鎖)。 為了避免出現(xiàn)死鎖的情況,使用鎖機(jī)制的程序應(yīng)該設(shè)定為每個(gè)線程一次只允許獲取一個(gè)鎖。如果不能這樣做的話,你就需要更高級(jí)的死鎖避免機(jī)制,我們將在12.5節(jié)介紹。 在 threading庫(kù)中還提供了其他的同步原語(yǔ),比如 RLoctSemaphore 對(duì)象。但是根據(jù)以往經(jīng)驗(yàn),這些原語(yǔ)是用于一些特殊的情況,如果你只是需要簡(jiǎn)單地對(duì)可變對(duì)象進(jìn)行鎖定,那就不應(yīng)該使用它們。一個(gè) RLock(可重入鎖)可以被同一個(gè)線程多次獲取,主要用來(lái)實(shí)現(xiàn)基于監(jiān)測(cè)對(duì)象模式的鎖定和同步。在使用這種鎖的情況下,當(dāng)鎖被持有時(shí),只有一個(gè)線程可以使用完整的函數(shù)或者類中的方法。比如,你可以實(shí)現(xiàn)一個(gè)這樣的 SharedCounter 類:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    _lock = threading.RLock()
    def __init__(self, initial_value = 0):
        self._value = initial_value

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with SharedCounter._lock:
            self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with SharedCounter._lock:
             self.incr(-delta)

在上邊這個(gè)例子中,沒(méi)有對(duì)每一個(gè)實(shí)例中的可變對(duì)象加鎖,取而代之的是一個(gè)被所有實(shí)例共享的類級(jí)鎖。這個(gè)鎖用來(lái)同步類方法,具體來(lái)說(shuō)就是,這個(gè)鎖可以保證一次只有一個(gè)線程可以調(diào)用這個(gè)類方法。不過(guò),與一個(gè)標(biāo)準(zhǔn)的鎖不同的是,已經(jīng)持有這個(gè)鎖的方法在調(diào)用同樣使用這個(gè)鎖的方法時(shí),無(wú)需再次獲取鎖。比如 decr 方法。 這種實(shí)現(xiàn)方式的一個(gè)特點(diǎn)是,無(wú)論這個(gè)類有多少個(gè)實(shí)例都只用一個(gè)鎖。因此在需要大量使用計(jì)數(shù)器的情況下內(nèi)存效率更高。不過(guò)這樣做也有缺點(diǎn),就是在程序中使用大量線程并頻繁更新計(jì)數(shù)器時(shí)會(huì)有爭(zhēng)用鎖的問(wèn)題。 信號(hào)量對(duì)象是一個(gè)建立在共享計(jì)數(shù)器基礎(chǔ)上的同步原語(yǔ)。如果計(jì)數(shù)器不為0,with 語(yǔ)句將計(jì)數(shù)器減1,線程被允許執(zhí)行。with 語(yǔ)句執(zhí)行結(jié)束后,計(jì)數(shù)器加1。如果計(jì)數(shù)器為0,線程將被阻塞,直到其他線程結(jié)束將計(jì)數(shù)器加1。盡管你可以在程序中像標(biāo)準(zhǔn)鎖一樣使用信號(hào)量來(lái)做線程同步,但是這種方式并不被推薦,因?yàn)槭褂眯盘?hào)量為程序增加的復(fù)雜性會(huì)影響程序性能。相對(duì)于簡(jiǎn)單地作為鎖使用,信號(hào)量更適用于那些需要在線程之間引入信號(hào)或者限制的程序。比如,你需要限制一段代碼的并發(fā)訪問(wèn)量,你就可以像下面這樣使用信號(hào)量完成:

from threading import Semaphore
import urllib.request

# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)

def fetch_url(url):
    with _fetch_url_sema:
        return urllib.request.urlopen(url)

如果你對(duì)線程同步原語(yǔ)的底層理論和實(shí)現(xiàn)感興趣,可以參考操作系統(tǒng)相關(guān)書籍,絕大多數(shù)都有提及。

防止死鎖的加鎖機(jī)制

問(wèn)題

你正在寫一個(gè)多線程程序,其中線程需要一次獲取多個(gè)鎖,此時(shí)如何避免死鎖問(wèn)題。

解決方案

在多線程程序中,死鎖問(wèn)題很大一部分是由于線程同時(shí)獲取多個(gè)鎖造成的。舉個(gè)例子:一個(gè)線程獲取了第一個(gè)鎖,然后在獲取第二個(gè)鎖的時(shí)候發(fā)生阻塞,那么這個(gè)線程就可能阻塞其他線程的執(zhí)行,從而導(dǎo)致整個(gè)程序假死。 解決死鎖問(wèn)題的一種方案是為程序中的每一個(gè)鎖分配一個(gè)唯一的 id,然后只允許按照升序規(guī)則來(lái)使用多個(gè)鎖,這個(gè)規(guī)則使用上下文管理器 是非常容易實(shí)現(xiàn)的,示例如下:

如何使用這個(gè)上下文管理器呢?你可以按照正常途徑創(chuàng)建一個(gè)鎖對(duì)象,但不論是單個(gè)鎖還是多個(gè)鎖中都使用 acquire()函數(shù)來(lái)申請(qǐng)鎖, 示例如下:

如果你執(zhí)行這段代碼,你會(huì)發(fā)現(xiàn)它即使在不同的函數(shù)中以不同的順序獲取鎖也沒(méi)有發(fā)生死鎖。 其關(guān)鍵在于,在第一段代碼中,我們對(duì)這些鎖進(jìn)行了排序。通過(guò)排序,使得不管用戶以什么樣的順序來(lái)請(qǐng)求鎖,這些鎖都會(huì)按照固定的順序被獲取。 如果有多個(gè) acquire() 操作被嵌套調(diào)用,可以通過(guò)線程本地存儲(chǔ)(TLS)來(lái)檢測(cè)潛在的死鎖問(wèn)題。 假設(shè)你的代碼是這樣寫的:

如果你運(yùn)行這個(gè)版本的代碼,必定會(huì)有一個(gè)線程發(fā)生崩潰,異常信息可能像這樣:

發(fā)生崩潰的原因在于,每個(gè)線程都記錄著自己已經(jīng)獲取到的鎖。acquire()函數(shù)會(huì)檢查之前已經(jīng)獲取的鎖列表, 由于鎖是按照升序排列獲取的,所以函數(shù)會(huì)認(rèn)為之前已獲取的鎖的 id 必定小于新申請(qǐng)到的鎖,這時(shí)就會(huì)觸發(fā)異常。

討論

死鎖是每一個(gè)多線程程序都會(huì)面臨的一個(gè)問(wèn)題(就像它是每一本操作系統(tǒng)課本的共同話題一樣)。根據(jù)經(jīng)驗(yàn)來(lái)講,盡可能保證每一個(gè) 線程只能同時(shí)保持一個(gè)鎖,這樣程序就不會(huì)被死鎖問(wèn)題所困擾。一旦有線程同時(shí)申請(qǐng)多個(gè)鎖,一切就不可預(yù)料了。

死鎖的檢測(cè)與恢復(fù)是一個(gè)幾乎沒(méi)有優(yōu)雅的解決方案的擴(kuò)展話題。一個(gè)比較常用的死鎖檢測(cè)與恢復(fù)的方案是引入看門狗計(jì)數(shù)器。當(dāng)線程正常 運(yùn)行的時(shí)候會(huì)每隔一段時(shí)間重置計(jì)數(shù)器,在沒(méi)有發(fā)生死鎖的情況下,一切都正常進(jìn)行。一旦發(fā)生死鎖,由于無(wú)法重置計(jì)數(shù)器導(dǎo)致定時(shí)器 超時(shí),這時(shí)程序會(huì)通過(guò)重啟自身恢復(fù)到正常狀態(tài)。

避免死鎖是另外一種解決死鎖問(wèn)題的方式,在進(jìn)程獲取鎖的時(shí)候會(huì)嚴(yán)格按照對(duì)象id升序排列獲取,經(jīng)過(guò)數(shù)學(xué)證明,這樣保證程序不會(huì)進(jìn)入 死鎖狀態(tài)。證明就留給讀者作為練習(xí)了。避免死鎖的主要思想是,單純地按照對(duì)象 id 遞增的順序加鎖不會(huì)產(chǎn)生循環(huán)依賴,而循環(huán)依賴是 死鎖的一個(gè)必要條件,從而避免程序進(jìn)入死鎖狀態(tài)。

下面以一個(gè)關(guān)于線程死鎖的經(jīng)典問(wèn)題:“哲學(xué)家就餐問(wèn)題”,作為本節(jié)最后一個(gè)例子。題目是這樣的:五位哲學(xué)家圍坐在一張桌子前,每個(gè)人 面前有一碗飯和一只筷子。在這里每個(gè)哲學(xué)家可以看做是一個(gè)獨(dú)立的線程,而每只筷子可以看做是一個(gè)鎖。每個(gè)哲學(xué)家可以處在靜坐、 思考、吃飯三種狀態(tài)中的一個(gè)。需要注意的是,每個(gè)哲學(xué)家吃飯是需要兩只筷子的,這樣問(wèn)題就來(lái)了:如果每個(gè)哲學(xué)家都拿起自己左邊的筷子, 那么他們五個(gè)都只能拿著一只筷子坐在那兒,直到餓死。此時(shí)他們就進(jìn)入了死鎖狀態(tài)。 下面是一個(gè)簡(jiǎn)單的使用死鎖避免機(jī)制解決“哲學(xué)家就餐問(wèn)題”的實(shí)現(xiàn):

最后,要特別注意到,為了避免死鎖,所有的加鎖操作必須使用 acquire()函數(shù)。如果代碼中的某部分繞過(guò) acquire 函數(shù)直接申請(qǐng)鎖,那么整個(gè)死鎖避免機(jī)制就不起作用了。

保存線程的狀態(tài)信息

問(wèn)題

你需要保存正在運(yùn)行線程的狀態(tài),這個(gè)狀態(tài)對(duì)于其他的線程是不可見(jiàn)的。

解決方案

有時(shí)在多線程編程中,你需要只保存當(dāng)前運(yùn)行線程的狀態(tài)。 要這么做,可使用 thread.local() 創(chuàng)建一個(gè)本地線程存儲(chǔ)對(duì)象。 對(duì)這個(gè)對(duì)象的屬性的保存和讀取操作都只會(huì)對(duì)執(zhí)行線程可見(jiàn),而其他線程并不可見(jiàn)。

作為使用本地存儲(chǔ)的一個(gè)有趣的實(shí)際例子, 考慮在8.3小節(jié)定義過(guò)的 LazyConnection 上下文管理器類。 下面我們對(duì)它進(jìn)行一些小的修改使得它可以適用于多線程:

from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
    def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
        self.address = address
        self.family = AF_INET
        self.type = SOCK_STREAM
        self.local = threading.local()

    def __enter__(self):
        if hasattr(self.local, 'sock'):
            raise RuntimeError('Already connected')
        self.local.sock = socket(self.family, self.type)
        self.local.sock.connect(self.address)
        return self.local.sock

    def __exit__(self, exc_ty, exc_val, tb):
        self.local.sock.close()
        del self.local.sock

代碼中,自己觀察對(duì)于self.local屬性的使用。 它被初始化尾一個(gè) threading.local()實(shí)例。 其他方法操作被存儲(chǔ)為 self.local.sock的套接字對(duì)象。 有了這些就可以在多線程中安全的使用 LazyConnection實(shí)例了。例如:

from functools import partial
def test(conn):
    with conn as s:
        s.send(b'GET /index.html HTTP/1.0\r\n')
        s.send(b'Host: www.python.org\r\n')

        s.send(b'\r\n')
        resp = b''.join(iter(partial(s.recv, 8192), b''))

    print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
    conn = LazyConnection(('www.python.org', 80))

    t1 = threading.Thread(target=test, args=(conn,))
    t2 = threading.Thread(target=test, args=(conn,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

它之所以行得通的原因是每個(gè)線程會(huì)創(chuàng)建一個(gè)自己專屬的套接字連接(存儲(chǔ)為 self.local.sock)。 因此,當(dāng)不同的線程執(zhí)行套接字操作時(shí),由于操作的是不同的套接字,因此它們不會(huì)相互影響。

討論

在大部分程序中創(chuàng)建和操作線程特定狀態(tài)并不會(huì)有什么問(wèn)題。 不過(guò),當(dāng)出了問(wèn)題的時(shí)候,通常是因?yàn)槟硞€(gè)對(duì)象被多個(gè)線程使用到,用來(lái)操作一些專用的系統(tǒng)資源, 比如一個(gè)套接字或文件。你不能讓所有線程貢獻(xiàn)一個(gè)單獨(dú)對(duì)象, 因?yàn)槎鄠€(gè)線程同時(shí)讀和寫的時(shí)候會(huì)產(chǎn)生混亂。 本地線程存儲(chǔ)通過(guò)讓這些資源只能在被使用的線程中可見(jiàn)來(lái)解決這個(gè)問(wèn)題。

本節(jié)中,使用 thread.local()可以讓 LazyConnection類支持一個(gè)線程一個(gè)連接, 而不是對(duì)于所有的進(jìn)程都只有一個(gè)連接。

其原理是,每個(gè) threading.local() 實(shí)例為每個(gè)線程維護(hù)著一個(gè)單獨(dú)的實(shí)例字典。 所有普通實(shí)例操作比如獲取、修改和刪除值僅僅操作這個(gè)字典。 每個(gè)線程使用一個(gè)獨(dú)立的字典就可以保證數(shù)據(jù)的隔離了。

創(chuàng)建一個(gè)線程池

問(wèn)題

你創(chuàng)建一個(gè)工作者線程池,用來(lái)相應(yīng)客戶端請(qǐng)求或執(zhí)行其他的工作。

解決方案

concurrent.futures 函數(shù)庫(kù)有一個(gè)ThreadPoolExecutor 類可以被用來(lái)完成這個(gè)任務(wù)。 下面是一個(gè)簡(jiǎn)單的 TCP 服務(wù)器,使用了一個(gè)線程池來(lái)響應(yīng)客戶端:

from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
    '''
    Handle a client connection
    '''
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')
    sock.close()

def echo_server(addr):
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

如果你想手動(dòng)創(chuàng)建你自己的線程池, 通??梢允褂靡粋€(gè) Queue 來(lái)輕松實(shí)現(xiàn)。下面是一個(gè)稍微不同但是手動(dòng)實(shí)現(xiàn)的例子:

from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
    '''
    Handle a client connection
    '''
    sock, client_addr = q.get()
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')

    sock.close()

def echo_server(addr, nworkers):
    # Launch the client workers
    q = Queue()
    for n in range(nworkers):
        t = Thread(target=echo_client, args=(q,))
        t.daemon = True
        t.start()

    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put((client_sock, client_addr))

echo_server(('',15000), 128)

使用 ThreadPoolExecutor 相對(duì)于手動(dòng)實(shí)現(xiàn)的一個(gè)好處在于它使得 任務(wù)提交者更方便的從被調(diào)用函數(shù)中獲取返回值。例如,你可能會(huì)像下面這樣寫:

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

例子中返回的 handle 對(duì)象會(huì)幫你處理所有的阻塞與協(xié)作,然后從工作線程中返回?cái)?shù)據(jù)給你。 特別的,a.result() 操作會(huì)阻塞進(jìn)程直到對(duì)應(yīng)的函數(shù)執(zhí)行完成并返回一個(gè)結(jié)果。

討論

通常來(lái)講,你應(yīng)該避免編寫線程數(shù)量可以無(wú)限制增長(zhǎng)的程序。例如,看看下面這個(gè)服務(wù)器:

from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM

def echo_client(sock, client_addr):
    '''
    Handle a client connection
    '''
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')
    sock.close()

def echo_server(addr, nworkers):
    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        t = Thread(target=echo_client, args=(client_sock, client_addr))
        t.daemon = True
        t.start()

echo_server(('',15000))

盡管這個(gè)也可以工作, 但是它不能抵御有人試圖通過(guò)創(chuàng)建大量線程讓你服務(wù)器資源枯竭而崩潰的攻擊行為。 通過(guò)使用預(yù)先初始化的線程池,你可以設(shè)置同時(shí)運(yùn)行線程的上限數(shù)量。

你可能會(huì)關(guān)心創(chuàng)建大量線程會(huì)有什么后果。 現(xiàn)代操作系統(tǒng)可以很輕松的創(chuàng)建幾千個(gè)線程的線程池。 甚至,同時(shí)幾千個(gè)線程等待工作并不會(huì)對(duì)其他代碼產(chǎn)生性能影響。 當(dāng)然了,如果所有線程同時(shí)被喚醒并立即在 CPU 上執(zhí)行,那就不同了——特別是有了全局解釋器鎖 GIL。 通常,你應(yīng)該只在 I/O 處理相關(guān)代碼中使用線程池。

創(chuàng)建大的線程池的一個(gè)可能需要關(guān)注的問(wèn)題是內(nèi)存的使用。 例如,如果你在 OS X 系統(tǒng)上面創(chuàng)建2000個(gè)線程,系統(tǒng)顯示 Python 進(jìn)程使用了超過(guò) 9 GB 的虛擬內(nèi)存。 不過(guò),這個(gè)計(jì)算通常是有誤差的。當(dāng)創(chuàng)建一個(gè)線程時(shí),操作系統(tǒng)會(huì)預(yù)留一個(gè)虛擬內(nèi)存區(qū)域來(lái) 放置線程的執(zhí)行棧(通常是 8 MB 大?。5沁@個(gè)內(nèi)存只有一小片段被實(shí)際映射到真實(shí)內(nèi)存中。 因此,Python 進(jìn)程使用到的真實(shí)內(nèi)存其實(shí)很小 (比如,對(duì)于2000個(gè)線程來(lái)講,只使用到了 70 MB 的真實(shí)內(nèi)存,而不是 9 GB)。 如果你擔(dān)心虛擬內(nèi)存大小,可以使用 threading.stack_size() 函數(shù)來(lái)降低它。例如:

import threading
threading.stack_size(65536)

如果你加上這條語(yǔ)句并再次運(yùn)行前面的創(chuàng)建2000個(gè)線程試驗(yàn), 你會(huì)發(fā)現(xiàn) Python 進(jìn)程只使用到了大概 210 MB 的虛擬內(nèi)存,而真實(shí)內(nèi)存使用量沒(méi)有變。 注意線程棧大小必須至少為32768字節(jié),通常是系統(tǒng)內(nèi)存頁(yè)大?。?096、8192等)的整數(shù)倍。

簡(jiǎn)單的并行編程

問(wèn)題

你有個(gè)程序要執(zhí)行 CPU 密集型工作,你想讓他利用多核 CPU 的優(yōu)勢(shì)來(lái)運(yùn)行的快一點(diǎn)。

解決方案

concurrent.futures庫(kù)提供了一個(gè) ProcessPoolExecutor 類, 可被用來(lái)在一個(gè)單獨(dú)的 Python 解釋器中執(zhí)行計(jì)算密集型函數(shù)。 不過(guò),要使用它,你首先要有一些計(jì)算密集型的任務(wù)。 我們通過(guò)一個(gè)簡(jiǎn)單而實(shí)際的例子來(lái)演示它。假定你有個(gè) Apache web 服務(wù)器日志目錄的 gzip 壓縮包:

logs/
   20120701.log.gz
   20120702.log.gz
   20120703.log.gz
   20120704.log.gz
   20120705.log.gz
   20120706.log.gz
   ...

進(jìn)一步假設(shè)每個(gè)日志文件內(nèi)容類似下面這樣:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

下面是一個(gè)腳本,在這些日志文件中查找出所有訪問(wèn)過(guò) robots.txt 文件的主機(jī):

# findrobots.py

import gzip
import io
import glob

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file
    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    for robots in map(find_robots, files):
        all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

前面的程序使用了通常的 map-reduce 風(fēng)格來(lái)編寫。 函數(shù) find_robots()在一個(gè)文件名集合上做 map 操作,并將結(jié)果匯總為一個(gè)單獨(dú)的結(jié)果, 也就是 find_all_robots() 函數(shù)中的 all_robots 集合。 現(xiàn)在,假設(shè)你想要修改這個(gè)程序讓它使用多核 CPU。 很簡(jiǎn)單——只需要將 map()操作替換為一個(gè) concurrent.futures 庫(kù)中生成的類似操作即可。 下面是一個(gè)簡(jiǎn)單修改版本:

# findrobots.py

import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file

    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

通過(guò)這個(gè)修改后,運(yùn)行這個(gè)腳本產(chǎn)生同樣的結(jié)果,但是在四核機(jī)器上面比之前快了3.5倍。 實(shí)際的性能優(yōu)化效果根據(jù)你的機(jī)器 CPU 數(shù)量的不同而不同。

討論

ProcessPoolExecutor 的典型用法如下:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
    ...
    do work in parallel using pool
    ...

其原理是,一個(gè) ProcessPoolExecutor 創(chuàng)建 N 個(gè)獨(dú)立的 Python 解釋器, N 是系統(tǒng)上面可用 CPU 的個(gè)數(shù)。你可以通過(guò)提供可選參數(shù)給ProcessPoolExecutor(N) 來(lái)修改 處理器數(shù)量。這個(gè)處理池會(huì)一直運(yùn)行到 with 塊中最后一個(gè)語(yǔ)句執(zhí)行完成, 然后處理池被關(guān)閉。不過(guò),程序會(huì)一直等待直到所有提交的工作被處理完成。

被提交到池中的工作必須被定義為一個(gè)函數(shù)。有兩種方法去提交。 如果你想讓一個(gè)列表推導(dǎo)或一個(gè) map() 操作并行執(zhí)行的話,可使用pool.map() :

# A function that performs a lot of work
def work(x):
    ...
    return result

# Nonparallel code
results = map(work, data)

# Parallel implementation
with ProcessPoolExecutor() as pool:
    results = pool.map(work, data)

另外,你可以使用pool.submit() 來(lái)手動(dòng)的提交單個(gè)任務(wù):

# Some function
def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)

    # Obtaining the result (blocks until done)
    r = future_result.result()
    ...

如果你手動(dòng)提交一個(gè)任務(wù),結(jié)果是一個(gè)Future 實(shí)例。 要獲取最終結(jié)果,你需要調(diào)用它的 result()方法。 它會(huì)阻塞進(jìn)程直到結(jié)果被返回來(lái)。

如果不想阻塞,你還可以使用一個(gè)回調(diào)函數(shù),例如:

def when_done(r):
    print('Got:', r.result())

with ProcessPoolExecutor() as pool:
     future_result = pool.submit(work, arg)
     future_result.add_done_callback(when_done)

回調(diào)函數(shù)接受一個(gè) Future 實(shí)例,被用來(lái)獲取最終的結(jié)果(比如通過(guò)調(diào)用它的 result()方法)。 盡管處理池很容易使用,在設(shè)計(jì)大程序的時(shí)候還是有很多需要注意的地方,如下幾點(diǎn):

  • 這種并行處理技術(shù)只適用于那些可以被分解為互相獨(dú)立部分的問(wèn)題。
  • 被提交的任務(wù)必須是簡(jiǎn)單函數(shù)形式。對(duì)于方法、閉包和其他類型的并行執(zhí)行還不支持。
  • 函數(shù)參數(shù)和返回值必須兼容 pickle,因?yàn)橐褂玫竭M(jìn)程間的通信,所有解釋器之間的交換數(shù)據(jù)必須被序列化
  • 被提交的任務(wù)函數(shù)不應(yīng)保留狀態(tài)或有副作用。除了打印日志之類簡(jiǎn)單的事情, 一旦啟動(dòng)你不能控制子進(jìn)程的任何行為,因此最好保持簡(jiǎn)單和純潔——函數(shù)不要去修改環(huán)境。

  • 在 Unix 上進(jìn)程池通過(guò)調(diào)用 fork()系統(tǒng)調(diào)用被創(chuàng)建, 它會(huì)克隆 Python 解釋器,包括 fork 時(shí)的所有程序狀態(tài)。 而在 Windows 上,克隆解釋器時(shí)不會(huì)克隆狀態(tài)。 實(shí)際的 fork 操作會(huì)在第一次調(diào)用pool.map()pool.submit()后發(fā)生。

  • 當(dāng)你混合使用進(jìn)程池和多線程的時(shí)候要特別小心。 你應(yīng)該在創(chuàng)建任何線程之前先創(chuàng)建并激活進(jìn)程池(比如在程序啟動(dòng)的 main 線程中創(chuàng)建進(jìn)程池)。

Python 的全局鎖問(wèn)題

問(wèn)題

你已經(jīng)聽(tīng)說(shuō)過(guò)全局解釋器鎖 GIL,擔(dān)心它會(huì)影響到多線程程序的執(zhí)行性能。

解決方案

盡管 Python 完全支持多線程編程, 但是解釋器的 C 語(yǔ)言實(shí)現(xiàn)部分在完全并行執(zhí)行時(shí)并不是線程安全的。 實(shí)際上,解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè) Python 線程執(zhí)行。 GIL 最大的問(wèn)題就是 Python 的多線程程序并不能利用多核 CPU 的優(yōu)勢(shì) (比如一個(gè)使用了多個(gè)線程的計(jì)算密集型程序只會(huì)在一個(gè)單 CPU 上面運(yùn)行)。

在討論普通的 GIL 之前,有一點(diǎn)要強(qiáng)調(diào)的是 GIL 只會(huì)影響到那些嚴(yán)重依賴 CPU 的程序(比如計(jì)算型的)。 如果你的程序大部分只會(huì)設(shè)計(jì)到 I/O,比如網(wǎng)絡(luò)交互,那么使用多線程就很合適, 因?yàn)樗鼈兇蟛糠謺r(shí)間都在等待。實(shí)際上,你完全可以放心的創(chuàng)建幾千個(gè) Python 線程, 現(xiàn)代操作系統(tǒng)運(yùn)行這么多線程沒(méi)有任何壓力,沒(méi)啥可擔(dān)心的。

而對(duì)于依賴 CPU 的程序,你需要弄清楚執(zhí)行的計(jì)算的特點(diǎn)。 例如,優(yōu)化底層算法要比使用多線程運(yùn)行快得多。 類似的,由于 Python 是解釋執(zhí)行的,如果你將那些性能瓶頸代碼移到一個(gè) C 語(yǔ)言擴(kuò)展模塊中, 速度也會(huì)提升的很快。如果你要操作數(shù)組,那么使用 NumPy 這樣的擴(kuò)展會(huì)非常的高效。 最后,你還可以考慮下其他可選實(shí)現(xiàn)方案,比如 PyPy,它通過(guò)一個(gè) JIT 編譯器來(lái)優(yōu)化執(zhí)行效率 (不過(guò)在寫這本書的時(shí)候它還不能支持 Python 3)。

還有一點(diǎn)要注意的是,線程不是專門用來(lái)優(yōu)化性能的。 一個(gè) CPU 依賴型程序可能會(huì)使用線程來(lái)管理一個(gè)圖形用戶界面、一個(gè)網(wǎng)絡(luò)連接或其他服務(wù)。 這時(shí)候,GIL 會(huì)產(chǎn)生一些問(wèn)題,因?yàn)槿绻粋€(gè)線程長(zhǎng)期持有 GIL 的話會(huì)導(dǎo)致其他非 CPU 型線程一直等待。 事實(shí)上,一個(gè)寫的不好的 C 語(yǔ)言擴(kuò)展會(huì)導(dǎo)致這個(gè)問(wèn)題更加嚴(yán)重, 盡管代碼的計(jì)算部分會(huì)比之前運(yùn)行的更快些。

說(shuō)了這么多,現(xiàn)在想說(shuō)的是我們有兩種策略來(lái)解決 GIL 的缺點(diǎn)。 首先,如果你完全工作于 Python 環(huán)境中,你可以使用 multiprocessing模塊來(lái)創(chuàng)建一個(gè)進(jìn)程池, 并像協(xié)同處理器一樣的使用它。例如,加入你有如下的線程代碼:

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = some_work(args)
    ...

修改代碼,使用進(jìn)程池:

# Processing pool (see below for initiazation)
pool = None

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = pool.apply(some_work, (args))
        ...

# Initiaze the pool
if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()

這個(gè)通過(guò)使用一個(gè)技巧利用進(jìn)程池解決了 GIL 的問(wèn)題。 當(dāng)一個(gè)線程想要執(zhí)行 CPU 密集型工作時(shí),會(huì)將任務(wù)發(fā)給進(jìn)程池。 然后進(jìn)程池會(huì)在另外一個(gè)進(jìn)程中啟動(dòng)一個(gè)單獨(dú)的 Python 解釋器來(lái)工作。 當(dāng)線程等待結(jié)果的時(shí)候會(huì)釋放 GIL。 并且,由于計(jì)算任務(wù)在單獨(dú)解釋器中執(zhí)行,那么就不會(huì)受限于 GIL 了。 在一個(gè)多核系統(tǒng)上面,你會(huì)發(fā)現(xiàn)這個(gè)技術(shù)可以讓你很好的利用多 CPU 的優(yōu)勢(shì)。

另外一個(gè)解決 GIL 的策略是使用 C 擴(kuò)展編程技術(shù)。 主要思想是將計(jì)算密集型任務(wù)轉(zhuǎn)移給 C,跟 Python 獨(dú)立,在工作的時(shí)候在 C 代碼中釋放 GIL。 這可以通過(guò)在 C 代碼中插入下面這樣的特殊宏來(lái)完成:

#include "Python.h"
...

PyObject *pyfunc(PyObject *self, PyObject *args) {
   ...
   Py_BEGIN_ALLOW_THREADS
   // Threaded C code
   ...
   Py_END_ALLOW_THREADS
   ...
}

如果你使用其他工具訪問(wèn) C 語(yǔ)言,比如對(duì)于 Cython 的 ctypes 庫(kù),你不需要做任何事。 例如,ctypes 在調(diào)用 C 時(shí)會(huì)自動(dòng)釋放 GIL。

討論

許多程序員在面對(duì)線程性能問(wèn)題的時(shí)候,馬上就會(huì)怪罪 GIL,什么都是它的問(wèn)題。 其實(shí)這樣子太不厚道也太天真了點(diǎn)。 作為一個(gè)真實(shí)的例子,在多線程的網(wǎng)絡(luò)編程中神秘的 stalls可能是因?yàn)槠渌虮热缫粋€(gè) DNS 查找延時(shí),而跟 GIL 毫無(wú)關(guān)系。 最后你真的需要先去搞懂你的代碼是否真的被 GIL 影響到。 同時(shí)還要明白 GIL 大部分都應(yīng)該只關(guān)注 CPU 的處理而不是 I/O.

如果你準(zhǔn)備使用一個(gè)處理器池,注意的是這樣做涉及到數(shù)據(jù)序列化和在不同 Python 解釋器通信。 被執(zhí)行的操作需要放在一個(gè)通過(guò) def 語(yǔ)句定義的 Python 函數(shù)中,不能是 lambda、閉包可調(diào)用實(shí)例等, 并且函數(shù)參數(shù)和返回值必須要兼容 pickle。 同樣,要執(zhí)行的任務(wù)量必須足夠大以彌補(bǔ)額外的通宵開銷。

另外一個(gè)難點(diǎn)是當(dāng)混合使用線程和進(jìn)程池的時(shí)候會(huì)讓你很頭疼。 如果你要同時(shí)使用兩者,最好在程序啟動(dòng)時(shí),創(chuàng)建任何線程之前先創(chuàng)建一個(gè)單例的進(jìn)程池。 然后線程使用同樣的進(jìn)程池來(lái)進(jìn)行它們的計(jì)算密集型工作。

C 擴(kuò)展最重要的特征是它們和 Python 解釋器是保持獨(dú)立的。 也就是說(shuō),如果你準(zhǔn)備將 Python 中的任務(wù)分配到 C 中去執(zhí)行, 你需要確保 C 代碼的操作跟 Python 保持獨(dú)立, 這就意味著不要使用 Python 數(shù)據(jù)結(jié)構(gòu)以及不要調(diào)用 Python 的 C API。 另外一個(gè)就是你要確保 C 擴(kuò)展所做的工作是足夠的,值得你這樣做。 也就是說(shuō) C 擴(kuò)展擔(dān)負(fù)起了大量的計(jì)算任務(wù),而不是少數(shù)幾個(gè)計(jì)算。

這些解決 GIL 的方案并不能適用于所有問(wèn)題。 例如,某些類型的應(yīng)用程序如果被分解為多個(gè)進(jìn)程處理的話并不能很好的工作, 也不能將它的部分代碼改成C語(yǔ)言執(zhí)行。 對(duì)于這些應(yīng)用程序,你就要自己需求解決方案了 (比如多進(jìn)程訪問(wèn)共享內(nèi)存區(qū),多解析器運(yùn)行于同一個(gè)進(jìn)程等)。 或者,你還可以考慮下其他的解釋器實(shí)現(xiàn),比如 PyPy。

了解更多關(guān)于在 C 擴(kuò)展中釋放 GIL,請(qǐng)參考15.7和15.10小節(jié)。

定義一個(gè) Actor 任務(wù)

問(wèn)題

你想定義跟 actor 模式中類似“actors”角色的任務(wù)

解決方案

actore 模式是一種最古老的也是最簡(jiǎn)單的并行和分布式計(jì)算解決方案。 事實(shí)上,它天生的簡(jiǎn)單性是它如此受歡迎的重要原因之一。 簡(jiǎn)單來(lái)講,一個(gè) actor 就是一個(gè)并發(fā)執(zhí)行的任務(wù),只是簡(jiǎn)單的執(zhí)行發(fā)送給它的消息任務(wù)。 響應(yīng)這些消息時(shí),它可能還會(huì)給其他 actor 發(fā)送更進(jìn)一步的消息。 actor 之間的通信是單向和異步的。因此,消息發(fā)送者不知道消息是什么時(shí)候被發(fā)送, 也