鍍金池/ 問答/Python  網(wǎng)絡(luò)安全/ Python3 asyncio 在線程中使用

Python3 asyncio 在線程中使用

下面代碼中模擬了使用 2 個線程管理 event loop 的情況:

import time
import asyncio
import threading

async def task(c, i):
    for _ in range(i):
        print(c)
        await asyncio.sleep(1)
    return i

def thread(loop):  # 異步程序
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(task('sub thread', 999))
    loop.run_forever()

def main():
    threading.Thread(target=thread, args=(asyncio.get_event_loop(), )).start()

    # 同步代碼開始
    future = asyncio.ensure_future(task('main thread', 5))
    while not future.done():
        time.sleep(1)
    print('main done: %s' % future.result())


if __name__ == '__main__':
    main()

其中子線程(thread 函數(shù))設(shè)定并啟用了一個 999 次的 task 任務(wù)。

而在主線程中,添加了一個 5 次的 task 任務(wù)。我設(shè)置了一個 While 循環(huán)來檢查這項任務(wù)是否已經(jīng)運行完畢,如若完畢則打印出 main done: 5 。


提出的問題:

  1. 像這樣的多線程 event loop 是否有其他方案可以實現(xiàn)?
  2. 在 main 函數(shù)的 5 個 task 的任務(wù),最初嘗試使用 run_until_complete 來等待執(zhí)行結(jié)束,但是與 run_forever 沖突導(dǎo)致拋出 RuntimeError: This event loop is already running 的錯誤,那么除了使用 While 循環(huán)外,還有其他方法阻塞后面代碼執(zhí)行嗎?

實際情況描述:

我有一個使用了 同步+異步 的程序,其中同步程序是運行在主線程上的,異步是運行在一個子線程中的。
同步與異步獨立運行,各司其職。但是有時候需要在同步中使用異步的函數(shù)或方法并取得結(jié)果。

回答
編輯回答
風(fēng)清揚(yáng)

你不需要循環(huán)調(diào)用 future.done(),用 future.result() 便可。

我建議把 eventloop 放在主線程,其它工作視類型可以放入

  1. 同(主)線程
    非阻塞(非CPU運算型)動作,例如 asyncio.sleep
  2. 從線程(池)
    阻塞(非CPU運算型)動作,例如 time.sleep
  3. 單獨進(jìn)程
    CPU運算型動作,例如計算質(zhì)數(shù)

參考

https://docs.python.org/3/lib...
https://wiki.python.org/moin/...

例子

# -*- coding: utf-8 -*-
import asyncio
from datetime import datetime


async def add(a, b):
    await asyncio.sleep(1)
    return a + b


async def master_thread(loop):
    print("{} master: 1+2={}".format(datetime.now(), await add(1, 2)))


def slave_thread(loop):
    # 注意:這不是 coroutine 函數(shù)
    import time
    time.sleep(2)

    f = asyncio.run_coroutine_threadsafe(add(1, 2), loop)
    print("{} slave: 1+2={}".format(datetime.now(), f.result()))


async def main(loop):
    await asyncio.gather(
        master_thread(loop),
        # 線程池內(nèi)執(zhí)行
        loop.run_in_executor(None, slave_thread, loop),
    )


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()
2017年2月27日 23:13