鍍金池/ 教程/ C/ 4.2 使用期望等待一次性事件
3.4 本章總結(jié)
6.3 基于鎖設(shè)計(jì)更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設(shè)計(jì)的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動(dòng)推導(dǎo)變量類型
2.1 線程管理的基礎(chǔ)
8.5 在實(shí)踐中設(shè)計(jì)并發(fā)代碼
2.4 運(yùn)行時(shí)決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設(shè)計(jì)數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對(duì)于設(shè)計(jì)無鎖數(shù)據(jù)結(jié)構(gòu)的指導(dǎo)建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測(cè)試和調(diào)試
5.4 本章總結(jié)
第9章 高級(jí)線程管理
5.1 內(nèi)存模型基礎(chǔ)
2.5 識(shí)別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時(shí)間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯(cuò)誤的技術(shù)
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強(qiáng)制排序
A.8 線程本地變量
第8章 并發(fā)代碼設(shè)計(jì)
3.3 保護(hù)共享數(shù)據(jù)的替代設(shè)施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個(gè)事件或其他條件
A.3 默認(rèn)函數(shù)
附錄A 對(duì)<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設(shè)計(jì)并發(fā)代碼的注意事項(xiàng)
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯(cuò)誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護(hù)共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達(dá)式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

4.2 使用期望等待一次性事件

假設(shè)你乘飛機(jī)去國外度假。當(dāng)你到達(dá)機(jī)場(chǎng),并且辦理完各種登機(jī)手續(xù)后,你還需要等待機(jī)場(chǎng)廣播通知你登機(jī),可能要等很多個(gè)小時(shí)。你可能會(huì)在候機(jī)室里面找一些事情來打發(fā)時(shí)間,比如:讀書,上網(wǎng),或者來一杯價(jià)格不菲的機(jī)場(chǎng)咖啡,不過從根本上來說你就在等待一件事情:機(jī)場(chǎng)廣播能夠登機(jī)的時(shí)間。給定的飛機(jī)班次再之后沒有可參考性;當(dāng)你在再次度假的時(shí)候,你可能會(huì)等待另一班飛機(jī)。

C++標(biāo)準(zhǔn)庫模型將這種一次性事件稱為期望(future)。當(dāng)一個(gè)線程需要等待一個(gè)特定的一次性事件時(shí),在某種程度上來說它就需要知道這個(gè)事件在未來的表現(xiàn)形式。之后,這個(gè)線程會(huì)周期性(較短的周期)的等待或檢查,事件是否觸發(fā)(檢查信息板);在檢查期間也會(huì)執(zhí)行其他任務(wù)(品嘗昂貴的咖啡)。另外,在等待任務(wù)期間它可以先執(zhí)行另外一些任務(wù),直到對(duì)應(yīng)的任務(wù)觸發(fā),而后等待期望的狀態(tài)會(huì)變?yōu)?em>就緒(ready)。一個(gè)“期望”可能是數(shù)據(jù)相關(guān)的(比如,你的登機(jī)口編號(hào)),也可能不是。當(dāng)事件發(fā)生時(shí)(并且期望狀態(tài)為就緒),這個(gè)“期望”就不能被重置。

在C++標(biāo)準(zhǔn)庫中,有兩種“期望”,使用兩種類型模板實(shí)現(xiàn),聲明在頭文件中: 唯一期望(unique futures)(std::future<>)和共享期望(shared futures)(std::shared_future<>)。這是仿照std::unique_ptrstd::shared_ptr。std::future的實(shí)例只能與一個(gè)指定事件相關(guān)聯(lián),而std::shared_future的實(shí)例就能關(guān)聯(lián)多個(gè)事件。后者的實(shí)現(xiàn)中,所有實(shí)例會(huì)在同時(shí)變?yōu)榫途w狀態(tài),并且他們可以訪問與事件相關(guān)的任何數(shù)據(jù)。這種數(shù)據(jù)關(guān)聯(lián)與模板有關(guān),比如std::unique_ptrstd::shared_ptr的模板參數(shù)就是相關(guān)聯(lián)的數(shù)據(jù)類型。在與數(shù)據(jù)無關(guān)的地方,可以使用std::future<void>std::shared_future<void>的特化模板。雖然,我希望用于線程間的通訊,但是“期望”對(duì)象本身并不提供同步訪問。當(dāng)多個(gè)線程需要訪問一個(gè)獨(dú)立“期望”對(duì)象時(shí),他們必須使用互斥量或類似同步機(jī)制對(duì)訪問進(jìn)行保護(hù),如在第3章提到的那樣。不過,在你將要閱讀到的4.2.5節(jié)中,多個(gè)線程會(huì)對(duì)一個(gè)std::shared_future<>實(shí)例的副本進(jìn)行訪問,而不需要期望同步,即使他們是同一個(gè)異步結(jié)果。

最基本的一次性事件,就是一個(gè)后臺(tái)運(yùn)行出的計(jì)算結(jié)果。在第2章中,你已經(jīng)了解了std::thread 執(zhí)行的任務(wù)不能有返回值,并且我能保證,這個(gè)問題將在使用“期望”后解決——現(xiàn)在就來看看是怎么解決的。

4.2.1 帶返回值的后臺(tái)任務(wù)

假設(shè),你有一個(gè)需要長時(shí)間的運(yùn)算,你需要其能計(jì)算出一個(gè)有效的值,但是你現(xiàn)在并不迫切需要這個(gè)值。可能你已經(jīng)找到了生命、宇宙,以及萬物的答案,就像道格拉斯·亞當(dāng)斯[1]一樣。你可以啟動(dòng)一個(gè)新線程來執(zhí)行這個(gè)計(jì)算,但是這就意味著你必須關(guān)注如何傳回計(jì)算的結(jié)果,因?yàn)?code>std::thread并不提供直接接收返回值的機(jī)制。這里就需要std::async函數(shù)模板(也是在頭文<future>中聲明的)了。

當(dāng)任務(wù)的結(jié)果你不著急要時(shí),你可以使用std::async啟動(dòng)一個(gè)異步任務(wù)。與std::thread對(duì)象等待的方式不同,std::async會(huì)返回一個(gè)std::future對(duì)象,這個(gè)對(duì)象持有最終計(jì)算出來的結(jié)果。當(dāng)你需要這個(gè)值時(shí),你只需要調(diào)用這個(gè)對(duì)象的get()成員函數(shù);并且會(huì)阻塞線程直到“期望”狀態(tài)為就緒為止;之后,返回計(jì)算結(jié)果。下面清單中代碼就是一個(gè)簡單的例子。

清單4.6 使用std::future從異步任務(wù)中獲取返回值

#include <future>
#include <iostream>

int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
  std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
  do_other_stuff();
  std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

std::thread 做的方式一樣,std::async允許你通過添加額外的調(diào)用參數(shù),向函數(shù)傳遞額外的參數(shù)。當(dāng)?shù)谝粋€(gè)參數(shù)是一個(gè)指向成員函數(shù)的指針,第二個(gè)參數(shù)提供有這個(gè)函數(shù)成員類的具體對(duì)象(不是直接的,就是通過指針,還可以包裝在std::ref中),剩余的參數(shù)可作為成員函數(shù)的參數(shù)傳入。否則,第二個(gè)和隨后的參數(shù)將作為函數(shù)的參數(shù),或作為指定可調(diào)用對(duì)象的第一個(gè)參數(shù)。就如std::thread,當(dāng)參數(shù)為右值(rvalues)時(shí),拷貝操作將使用移動(dòng)的方式轉(zhuǎn)移原始數(shù)據(jù)。這就允許使用“只移動(dòng)”類型作為函數(shù)對(duì)象和參數(shù)。來看一下下面的程序清單:

清單4.7 使用std::async向函數(shù)傳遞參數(shù)

#include <string>
#include <future>
struct X
{
  void foo(int,std::string const&);
  std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello");  // 調(diào)用p->foo(42, "hello"),p是指向x的指針
auto f2=std::async(&X::bar,x,"goodbye");  // 調(diào)用tmpx.bar("goodbye"), tmpx是x的拷貝副本
struct Y
{
  double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141);  // 調(diào)用tmpy(3.141),tmpy通過Y的移動(dòng)構(gòu)造函數(shù)得到
auto f4=std::async(std::ref(y),2.718);  // 調(diào)用y(2.718)
X baz(X&);
std::async(baz,std::ref(x));  // 調(diào)用baz(x)
class move_only
{
public:
  move_only();
  move_only(move_only&&)
  move_only(move_only const&) = delete;
  move_only& operator=(move_only&&);
  move_only& operator=(move_only const&) = delete;

  void operator()();
};
auto f5=std::async(move_only());  // 調(diào)用tmp(),tmp是通過std::move(move_only())構(gòu)造得到

在默認(rèn)情況下,“期望”是否進(jìn)行等待取決于std::async是否啟動(dòng)一個(gè)線程,或是否有任務(wù)正在進(jìn)行同步。在大多數(shù)情況下(估計(jì)這就是你想要的結(jié)果),但是你也可以在函數(shù)調(diào)用之前,向std::async傳遞一個(gè)額外參數(shù)。這個(gè)參數(shù)的類型是std::launch,還可以是std::launch::defered,用來表明函數(shù)調(diào)用被延遲到wait()或get()函數(shù)調(diào)用時(shí)才執(zhí)行,std::launch::async 表明函數(shù)必須在其所在的獨(dú)立線程上執(zhí)行,std::launch::deferred | std::launch::async表明實(shí)現(xiàn)可以選擇這兩種方式的一種。最后一個(gè)選項(xiàng)是默認(rèn)的。當(dāng)函數(shù)調(diào)用被延遲,它可能不會(huì)在運(yùn)行了。如下所示:

auto f6=std::async(std::launch::async,Y(),1.2);  // 在新線程上執(zhí)行
auto f7=std::async(std::launch::deferred,baz,std::ref(x));  // 在wait()或get()調(diào)用時(shí)執(zhí)行
auto f8=std::async(
              std::launch::deferred | std::launch::async,
              baz,std::ref(x));  // 實(shí)現(xiàn)選擇執(zhí)行方式
auto f9=std::async(baz,std::ref(x));
f7.wait();  //  調(diào)用延遲函數(shù)

在本章的后面和第8章中,你將會(huì)再次看到這段程序,使用std::async會(huì)讓分割算法到各個(gè)任務(wù)中變的容易,這樣程序就能并發(fā)的執(zhí)行了。不過,這不是讓一個(gè)std::future與一個(gè)任務(wù)實(shí)例相關(guān)聯(lián)的唯一方式;你也可以將任務(wù)包裝入一個(gè)std::packaged_task<>實(shí)例中,或通過編寫代碼的方式,使用std::promise<>類型模板顯示設(shè)置值。與std::promise<>對(duì)比,std::packaged_task<>具有更高層的抽象,所以我們從“高抽象”的模板說起。

4.2.2 任務(wù)與期望

std::packaged_task<>對(duì)一個(gè)函數(shù)或可調(diào)用對(duì)象,綁定一個(gè)期望。當(dāng)std::packaged_task<> 對(duì)象被調(diào)用,它就會(huì)調(diào)用相關(guān)函數(shù)或可調(diào)用對(duì)象,將期望狀態(tài)置為就緒,返回值也會(huì)被存儲(chǔ)為相關(guān)數(shù)據(jù)。這可以用在構(gòu)建線程池的結(jié)構(gòu)單元(可見第9章),或用于其他任務(wù)的管理,比如在任務(wù)所在線程上運(yùn)行任務(wù),或?qū)⑺鼈冺樞虻倪\(yùn)行在一個(gè)特殊的后臺(tái)線程上。當(dāng)一個(gè)粒度較大的操作可以被分解為獨(dú)立的子任務(wù)時(shí),其中每個(gè)子任務(wù)就可以包含在一個(gè)std::packaged_task<>實(shí)例中,之后這個(gè)實(shí)例將傳遞到任務(wù)調(diào)度器或線程池中。對(duì)任務(wù)的細(xì)節(jié)進(jìn)行抽象,調(diào)度器僅處理std::packaged_task<>實(shí)例,而非處理單獨(dú)的函數(shù)。

std::packaged_task<>的模板參數(shù)是一個(gè)函數(shù)簽名,比如void()就是一個(gè)沒有參數(shù)也沒有返回值的函數(shù),或int(std::string&, double*)就是有一個(gè)非const引用的std::string和一個(gè)指向double類型的指針,并且返回類型是int。當(dāng)你構(gòu)造出一個(gè)std::packaged_task<>實(shí)例時(shí),你必須傳入一個(gè)函數(shù)或可調(diào)用對(duì)象,這個(gè)函數(shù)或可調(diào)用的對(duì)象需要能接收指定的參數(shù)和返回可轉(zhuǎn)換為指定返回類型的值。類型可以不完全匹配;你可以用一個(gè)int類型的參數(shù)和返回一個(gè)float類型的函數(shù),來構(gòu)建std::packaged_task<double(double)>的實(shí)例,因?yàn)樵谶@里,類型可以隱式轉(zhuǎn)換。

指定函數(shù)簽名的返回類型可以用來標(biāo)識(shí),從get_future()返回的std::future<>的類型,不過函數(shù)簽名的參數(shù)列表,可用來指定“打包任務(wù)”的函數(shù)調(diào)用操作符。例如,模板偏特化std::packaged_task<std::string(std::vector<char>*,int)>將在下面的代碼清單中使用。

清單4.8 std::packaged_task<>的偏特化

template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
  template<typename Callable>
  explicit packaged_task(Callable&& f);
  std::future<std::string> get_future();
  void operator()(std::vector<char>*,int);
};

這里的std::packaged_task對(duì)象是一個(gè)可調(diào)用對(duì)象,并且它可以包含在一個(gè)std::function對(duì)象中,傳遞到std::thread對(duì)象中,就可作為線程函數(shù);傳遞另一個(gè)函數(shù)中,就作為可調(diào)用對(duì)象,或可以直接進(jìn)行調(diào)用。當(dāng)std::packaged_task作為一個(gè)函數(shù)調(diào)用時(shí),可為函數(shù)調(diào)用操作符提供所需的參數(shù),并且返回值作為異步結(jié)果存儲(chǔ)在std::future,可通過get_future()獲取。你可以把一個(gè)任務(wù)包含入std::packaged_task,并且在檢索期望之前,需要將std::packaged_task對(duì)象傳入,以便調(diào)用時(shí)能及時(shí)的找到。

當(dāng)你需要異步任務(wù)的返回值時(shí),你可以等待期望的狀態(tài)變?yōu)椤熬途w”。下面的代碼就是這么個(gè)情況。

線程間傳遞任務(wù)

很多圖形架構(gòu)需要特定的線程去更新界面,所以當(dāng)一個(gè)線程需要界面的更新時(shí),它需要發(fā)出一條信息給正確的線程,讓特定的線程來做界面更新。std::packaged_task提供了完成這種功能的一種方法,且不需要發(fā)送一條自定義信息給圖形界面相關(guān)線程。下面來看看代碼。

清單4.9 使用std::packaged_task執(zhí)行一個(gè)圖形界面線程

#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread()  // 1
{
  while(!gui_shutdown_message_received())  // 2
  {
    get_and_process_gui_message();  // 3
    std::packaged_task<void()> task;
    {
      std::lock_guard<std::mutex> lk(m);
      if(tasks.empty())  // 4
        continue;
      task=std::move(tasks.front());  // 5
      tasks.pop_front();
    }
    task();  // 6
  }
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
  std::packaged_task<void()> task(f);  // 7
  std::future<void> res=task.get_future();  // 8
  std::lock_guard<std::mutex> lk(m);  // 9
  tasks.push_back(std::move(task));  // 10
  return res;
}

這段代碼十分簡單:圖形界面線程①循環(huán)直到收到一條關(guān)閉圖形界面的信息后關(guān)閉②,進(jìn)行輪詢界面消息處理③,例如用戶點(diǎn)擊,和執(zhí)行在隊(duì)列中的任務(wù)。當(dāng)隊(duì)列中沒有任務(wù)④,它將再次循環(huán);除非,他能在隊(duì)列中提取出一個(gè)任務(wù)⑤,然后釋放隊(duì)列上的鎖,并且執(zhí)行任務(wù)⑥。這里,“期望”與任務(wù)相關(guān),當(dāng)任務(wù)執(zhí)行完成時(shí),其狀態(tài)會(huì)被置為“就緒”狀態(tài)。

將一個(gè)任務(wù)傳入隊(duì)列,也很簡單:提供的函數(shù)⑦可以提供一個(gè)打包好的任務(wù),可以通過這個(gè)任務(wù)⑧調(diào)用get_future()成員函數(shù)獲取“期望”對(duì)象,并且在任務(wù)被推入列表⑨之前,“期望”將返回調(diào)用函數(shù)⑩。當(dāng)需要知道線程執(zhí)行完任務(wù)時(shí),向圖形界面線程發(fā)布消息的代碼,會(huì)等待“期望”改變狀態(tài);否則,則會(huì)丟棄這個(gè)“期望”。

這個(gè)例子使用std::packaged_task<void()>創(chuàng)建任務(wù),其包含了一個(gè)無參數(shù)無返回值的函數(shù)或可調(diào)用對(duì)象(如果當(dāng)這個(gè)調(diào)用有返回值時(shí),返回值會(huì)被丟棄)。這可能是最簡單的任務(wù),如你之前所見,std::packaged_task也可以用于一些復(fù)雜的情況——通過指定一個(gè)不同的函數(shù)簽名作為模板參數(shù),你不僅可以改變其返回類型(因此該類型的數(shù)據(jù)會(huì)存在期望相關(guān)的狀態(tài)中),而且也可以改變函數(shù)操作符的參數(shù)類型。這個(gè)例子可以簡單的擴(kuò)展成允許任務(wù)運(yùn)行在圖形界面線程上,且接受傳參,還有通過std::future返回值,而不僅僅是完成一個(gè)指標(biāo)。

這些任務(wù)能作為一個(gè)簡單的函數(shù)調(diào)用來表達(dá)嗎?還有,這些任務(wù)的結(jié)果能從很多地方得到嗎?這些情況可以使用第三種方法創(chuàng)建“期望”來解決:使用std::promise對(duì)值進(jìn)行顯示設(shè)置。

4.2.3 使用std::promises

當(dāng)你有一個(gè)應(yīng)用,需要處理很多網(wǎng)絡(luò)連接,它會(huì)使用不同線程嘗試連接每個(gè)接口,因?yàn)檫@能使網(wǎng)絡(luò)盡早聯(lián)通,盡早執(zhí)行程序。當(dāng)連接較少的時(shí)候,這樣的工作沒有問題(也就是線程數(shù)量比較少)。不幸的是,隨著連接數(shù)量的增長,這種方式變的越來越不合適;因?yàn)榇罅康木€程會(huì)消耗大量的系統(tǒng)資源,還有可能造成上下文頻繁切換(當(dāng)線程數(shù)量超出硬件可接受的并發(fā)數(shù)時(shí)),這都會(huì)對(duì)性能有影響。最極端的例子就是,因?yàn)橄到y(tǒng)資源被創(chuàng)建的線程消耗殆盡,系統(tǒng)連接網(wǎng)絡(luò)的能力會(huì)變的極差。在不同的應(yīng)用程序中,存在著大量的網(wǎng)絡(luò)連接,因此不同應(yīng)用都會(huì)擁有一定數(shù)量的線程(可能只有一個(gè))來處理網(wǎng)絡(luò)連接,每個(gè)線程處理可同時(shí)處理多個(gè)連接事件。

考慮一個(gè)線程處理多個(gè)連接事件,來自不同的端口連接的數(shù)據(jù)包基本上是以亂序方式進(jìn)行處理的;同樣的,數(shù)據(jù)包也將以亂序的方式進(jìn)入隊(duì)列。在很多情況下,另一些應(yīng)用不是等待數(shù)據(jù)成功的發(fā)送,就是等待一批(新的)來自指定網(wǎng)絡(luò)接口的數(shù)據(jù)接收成功。

std::promise<T>提供設(shè)定值的方式(類型為T),這個(gè)類型會(huì)和后面看到的std::future<T> 對(duì)象相關(guān)聯(lián)。一對(duì)std::promise/std::future會(huì)為這種方式提供一個(gè)可行的機(jī)制;在期望上可以阻塞等待線程,同時(shí),提供數(shù)據(jù)的線程可以使用組合中的“承諾”來對(duì)相關(guān)值進(jìn)行設(shè)置,以及將“期望”的狀態(tài)置為“就緒”。

可以通過get_future()成員函數(shù)來獲取與一個(gè)給定的std::promise相關(guān)的std::future對(duì)象,就像是與std::packaged_task相關(guān)。當(dāng)“承諾”的值已經(jīng)設(shè)置完畢(使用set_value()成員函數(shù)),對(duì)應(yīng)“期望”的狀態(tài)變?yōu)椤熬途w”,并且可用于檢索已存儲(chǔ)的值。當(dāng)你在設(shè)置值之前銷毀std::promise,將會(huì)存儲(chǔ)一個(gè)異常。在4.2.4節(jié)中,會(huì)詳細(xì)描述異常是如何傳送到線程的。

清單4.10中,是單線程處理多接口的實(shí)現(xiàn),如同我們所說的那樣。在這個(gè)例子中,你可以使用一對(duì)std::promise<bool>/std::future<bool>找出一塊傳出成功的數(shù)據(jù)塊;與“期望”相關(guān)值只是一個(gè)簡單的“成功/失敗”標(biāo)識(shí)。對(duì)于傳入包,與“期望”相關(guān)的數(shù)據(jù)就是數(shù)據(jù)包的有效負(fù)載。

清單4.10 使用“承諾”解決單線程多連接問題

#include <future>

void process_connections(connection_set& connections)
{
  while(!done(connections))  // 1
  {
    for(connection_iterator  // 2
            connection=connections.begin(),end=connections.end();
          connection!=end;
          ++connection)
    {
      if(connection->has_incoming_data())  // 3
      {
        data_packet data=connection->incoming();
        std::promise<payload_type>& p=
            connection->get_promise(data.id);  // 4
        p.set_value(data.payload);
      }
      if(connection->has_outgoing_data())  // 5
      {
        outgoing_packet data=
            connection->top_of_outgoing_queue();
        connection->send(data.payload);
        data.promise.set_value(true);  // 6
      }
    }
  }
}

函數(shù)process_connections()中,直到done()返回true①為止。每一次循環(huán),程序都會(huì)依次的檢查每一個(gè)連接②,檢索是否有數(shù)據(jù)③或正在發(fā)送已入隊(duì)的傳出數(shù)據(jù)⑤。這里假設(shè)輸入數(shù)據(jù)包是具有ID和有效負(fù)載的(有實(shí)際的數(shù)在其中)。一個(gè)ID映射到一個(gè)std::promise(可能是在相關(guān)容器中進(jìn)行的依次查找)④,并且值是設(shè)置在包的有效負(fù)載中的。對(duì)于傳出包,包是從傳出隊(duì)列中進(jìn)行檢索的,實(shí)際上從接口直接發(fā)送出去。當(dāng)發(fā)送完成,與傳出數(shù)據(jù)相關(guān)的“承諾”將置為true,來表明傳輸成功⑥。這是否能映射到實(shí)際網(wǎng)絡(luò)協(xié)議上,取決于網(wǎng)絡(luò)所用協(xié)議;這里的“承諾/期望”組合方式可能會(huì)在特殊的情況下無法工作,但是它與一些操作系統(tǒng)支持的異步輸入/輸出結(jié)構(gòu)類似。

上面的代碼完全不理會(huì)異常,它可能在想象的世界中,一切工作都會(huì)很好的執(zhí)行,但是這有悖常理。有時(shí)候磁盤滿載,有時(shí)候你會(huì)找不到東西,有時(shí)候網(wǎng)絡(luò)會(huì)斷,還有時(shí)候數(shù)據(jù)庫會(huì)奔潰。當(dāng)你需要某個(gè)操作的結(jié)果時(shí),你就需要在對(duì)應(yīng)的線程上執(zhí)行這個(gè)操作,因?yàn)榇a可以通過一個(gè)異常來報(bào)告錯(cuò)誤;不過使用std::packaged_taskstd::promise,就會(huì)帶來一些不必要的限制(在所有工作都正常的情況下)。因此,C++標(biāo)準(zhǔn)庫提供了一種在以上情況下清理異常的方法,并且允許他們將異常存儲(chǔ)為相關(guān)結(jié)果的一部分。

4.2.4 為“期望”存儲(chǔ)“異常”

看完下面短小的代碼段,思考一下,當(dāng)你傳遞-1到square_root()中時(shí),它將拋出一個(gè)異常,并且這個(gè)異常將會(huì)被調(diào)用者看到:

double square_root(double x)
{
  if(x<0)
  {
    throw std::out_of_range(“x<0”);
  }
  return sqrt(x);
}

假設(shè)調(diào)用square_root()函數(shù)不是當(dāng)前線程,

double y=square_root(-1);

你將這樣的調(diào)用改為異步調(diào)用:

std::future<double> f=std::async(square_root,-1);
double y=f.get();

如果行為是完全相同的時(shí)候,其結(jié)果是理想的;在任何情況下,y獲得函數(shù)調(diào)用的結(jié)果,當(dāng)線程調(diào)用f.get()時(shí),就能再看到異常了,即使在一個(gè)單線程例子中。

好吧,事實(shí)的確如此:函數(shù)作為std::async的一部分時(shí),當(dāng)在調(diào)用時(shí)拋出一個(gè)異常,那么這個(gè)異常就會(huì)存儲(chǔ)到“期望”的結(jié)果數(shù)據(jù)中,之后“期望”的狀態(tài)被置為“就緒”,之后調(diào)用get()會(huì)拋出這個(gè)存儲(chǔ)的異常。(注意:標(biāo)準(zhǔn)級(jí)別沒有指定重新拋出的這個(gè)異常是原始的異常對(duì)象,還是一個(gè)拷貝;不同的編譯器和庫將會(huì)在這方面做出不同的選擇)。當(dāng)你將函數(shù)打包入std::packaged_task任務(wù)包中后,在這個(gè)任務(wù)被調(diào)用時(shí),同樣的事情也會(huì)發(fā)生;當(dāng)打包函數(shù)拋出一個(gè)異常,這個(gè)異常將被存儲(chǔ)在“期望”的結(jié)果中,準(zhǔn)備在調(diào)用get()再次拋出。

當(dāng)然,通過函數(shù)的顯式調(diào)用,std::promise也能提供同樣的功能。當(dāng)你希望存入的是一個(gè)異常而非一個(gè)數(shù)值時(shí),你就需要調(diào)用set_exception()成員函數(shù),而非set_value()。這通常是用在一個(gè)catch塊中,并作為算法的一部分,為了捕獲異常,使用異常填充“承諾”:

extern std::promise<double> some_promise;
try
{
  some_promise.set_value(calculate_value());
}
catch(...)
{
  some_promise.set_exception(std::current_exception());
}

這里使用了std::current_exception()來檢索拋出的異常;可用std::copy_exception()作為一個(gè)替換方案,std::copy_exception()會(huì)直接存儲(chǔ)一個(gè)新的異常而不拋出:

some_promise.set_exception(std::copy_exception(std::logic_error("foo ")));

這就比使用try/catch塊更加清晰,當(dāng)異常類型是已知的,它就應(yīng)該優(yōu)先被使用;不是因?yàn)榇a實(shí)現(xiàn)簡單,而是它給編譯器提供了極大的代碼優(yōu)化空間。

另一種向“期望”中存儲(chǔ)異常的方式是,在沒有調(diào)用“承諾”上的任何設(shè)置函數(shù)前,或正在調(diào)用包裝好的任務(wù)時(shí),銷毀與std::promisestd::packaged_task相關(guān)的“期望”對(duì)象。在這任何情況下,當(dāng)“期望”的狀態(tài)還不是“就緒”時(shí),調(diào)用std::promisestd::packaged_task的析構(gòu)函數(shù),將會(huì)存儲(chǔ)一個(gè)與std::future_errc::broken_promise錯(cuò)誤狀態(tài)相關(guān)的std::future_error異常;通過創(chuàng)建一個(gè)“期望”,你可以構(gòu)造一個(gè)“承諾”為其提供值或異常;你可以通過銷毀值和異常源,去違背“承諾”。在這種情況下,編譯器沒有在“期望”中存儲(chǔ)任何東西,等待線程可能會(huì)永遠(yuǎn)的等下去。

直到現(xiàn)在,所有例子都在用std::future。不過,std::future也有局限性,在很多線程在等待的時(shí)候,只有一個(gè)線程能獲取等待結(jié)果。當(dāng)多個(gè)線程需要等待相同的事件的結(jié)果,你就需要使用std::shared_future來替代std::future了。

4.2.5 多個(gè)線程的等待

雖然std::future可以處理所有在線程間數(shù)據(jù)轉(zhuǎn)移的必要同步,但是調(diào)用某一特殊std::future對(duì)象的成員函數(shù),就會(huì)讓這個(gè)線程的數(shù)據(jù)和其他線程的數(shù)據(jù)不同步。當(dāng)多線程在沒有額外同步的情況下,訪問一個(gè)獨(dú)立的std::future對(duì)象時(shí),就會(huì)有數(shù)據(jù)競爭和未定義的行為。這是因?yàn)椋?code>std::future模型獨(dú)享同步結(jié)果的所有權(quán),并且通過調(diào)用get()函數(shù),一次性的獲取數(shù)據(jù),這就讓并發(fā)訪問變的毫無意義——只有一個(gè)線程可以獲取結(jié)果值,因?yàn)樵诘谝淮握{(diào)用get()后,就沒有值可以再獲取了。

如果你的并行代碼沒有辦法讓多個(gè)線程等待同一個(gè)事件,先別太失落;std::shared_future可以來幫你解決。因?yàn)?code>std::future是只移動(dòng)的,所以其所有權(quán)可以在不同的實(shí)例中互相傳遞,但是只有一個(gè)實(shí)例可以獲得特定的同步結(jié)果;而std::shared_future實(shí)例是可拷貝的,所以多個(gè)對(duì)象可以引用同一關(guān)聯(lián)“期望”的結(jié)果。

在每一個(gè)std::shared_future的獨(dú)立對(duì)象上成員函數(shù)調(diào)用返回的結(jié)果還是不同步的,所以為了在多個(gè)線程訪問一個(gè)獨(dú)立對(duì)象時(shí),避免數(shù)據(jù)競爭,必須使用鎖來對(duì)訪問進(jìn)行保護(hù)。優(yōu)先使用的辦法:為了替代只有一個(gè)拷貝對(duì)象的情況,可以讓每個(gè)線程都擁有自己對(duì)應(yīng)的拷貝對(duì)象。這樣,當(dāng)每個(gè)線程都通過自己擁有的std::shared_future對(duì)象獲取結(jié)果,那么多個(gè)線程訪問共享同步結(jié)果就是安全的??梢妶D4.1。

http://wiki.jikexueyuan.com/project/cplusplus-concurrency-action/images/chapter4/4-1.png" alt="" />

圖4.1 使用多個(gè)std::shared_future對(duì)象來避免數(shù)據(jù)競爭

有可能會(huì)使用std::shared_future的地方,例如,實(shí)現(xiàn)類似于復(fù)雜的電子表格的并行執(zhí)行;每一個(gè)單元格有單一的終值,這個(gè)終值可能是有其他單元格中的數(shù)據(jù)通過公式計(jì)算得到的。公式計(jì)算得到的結(jié)果依賴于其他單元格,然后可以使用一個(gè)std::shared_future對(duì)象引用第一個(gè)單元格的數(shù)據(jù)。當(dāng)每個(gè)單元格內(nèi)的所有公式并行執(zhí)行后,這些任務(wù)會(huì)以期望的方式完成工作;不過,當(dāng)其中有計(jì)算需要依賴其他單元格的值,那么它就會(huì)被阻塞,直到依賴單元格的數(shù)據(jù)準(zhǔn)備就緒。這將讓系統(tǒng)在最大程度上使用可用的硬件并發(fā)。

std::shared_future的實(shí)例同步std::future實(shí)例的狀態(tài)。當(dāng)std::future對(duì)象沒有與其他對(duì)象共享同步狀態(tài)所有權(quán),那么所有權(quán)必須使用std::move將所有權(quán)傳遞到std::shared_future,其默認(rèn)構(gòu)造函數(shù)如下:

std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid());  // 1 "期望" f 是合法的
std::shared_future<int> sf(std::move(f));
assert(!f.valid());  // 2 "期望" f 現(xiàn)在是不合法的
assert(sf.valid());  // 3 sf 現(xiàn)在是合法的

這里,“期望”f開始是合法的①,因?yàn)樗玫氖恰俺兄Z”p的同步狀態(tài),但是在轉(zhuǎn)移sf的狀態(tài)后,f就不合法了②,而sf就是合法的了③。

如其他可移動(dòng)對(duì)象一樣,轉(zhuǎn)移所有權(quán)是對(duì)右值的隱式操作,所以你可以通過std::promise對(duì)象的成員函數(shù)get_future()的返回值,直接構(gòu)造一個(gè)std::shared_future對(duì)象,例如:

std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future());  // 1 隱式轉(zhuǎn)移所有權(quán)

這里轉(zhuǎn)移所有權(quán)是隱式的;用一個(gè)右值構(gòu)造std::shared_future<>,得到std::future<std::string>類型的實(shí)例①。

std::future的這種特性,可促進(jìn)std::shared_future的使用,容器可以自動(dòng)的對(duì)類型進(jìn)行推斷,從而初始化這個(gè)類型的變量(詳見附錄A,A.6節(jié))。std::future有一個(gè)share()成員函數(shù),可用來創(chuàng)建新的std::shared_future ,并且可以直接轉(zhuǎn)移“期望”的所有權(quán)。這樣也就能保存很多類型,并且使得代碼易于修改:

std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator,
     SomeAllocator>::iterator> p;
auto sf=p.get_future().share();

在這個(gè)例子中,sf的類型推到為std::shared_future<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator>,一口氣還真的很難念完。當(dāng)比較器或分配器有所改動(dòng),你只需要對(duì)“承諾”的類型進(jìn)行修改即可;“期望”的類型會(huì)自動(dòng)更新,與“承諾”的修改進(jìn)行匹配。

有時(shí)候你需要限定等待一個(gè)事件的時(shí)間,不論是因?yàn)槟阍跁r(shí)間上有硬性規(guī)定(一段指定的代碼需要在某段時(shí)間內(nèi)完成),還是因?yàn)樵谑录]有很快的觸發(fā)時(shí),有其他必要的工作需要特定線程來完成。為了處理這種情況,很多等待函數(shù)具有用于指定超時(shí)的變量。


[1] 在《銀河系漫游指南》(The Hitchhiker’s Guide to the Galaxy)中, 計(jì)算機(jī)在經(jīng)過深度思考后,將“人生之匙和宇宙萬物”的答案確定為42。