這篇文章主要為大家詳細介紹了Python使用asyncio包處理並發的相關資料,具有一定的參考價值,有興趣的小伙伴們可以參考一下
阻塞型I/ O和GIL
CPython 解釋器本身就不是線程安全的,因此有全域解釋器鎖定(GIL),一次只允許使用一個執行緒執行Python 字節碼。因此,一個 Python 進程通常不能同時使用多個 CPU 核心。
然而,標準函式庫中所有執行阻塞型 I/O 操作的函數,在等待作業系統回傳結果時都會釋放GIL。這意味著在 Python 語言這個層次上可以使用多線程,而 I/O 密集型 Python 程式能從中受益:當一個 Python 線程等待網路回應時,阻塞型 I/O 函數會釋放 GIL,再運行一個線程。
asyncio
這個套件使用事件循環驅動的協程實作並發。 asyncio 大量使用 yield from 表達式,因此與Python 舊版不相容。
asyncio 套件使用的「協程」是較嚴格的定義。適合asyncio API 的協程在定義體中必須使用 yield from,而不能使用 yield。此外,適合asyncio 的協程要由呼叫方驅動,並由呼叫方透過yield from 呼叫;
範例1
##
import threading import asyncio @asyncio.coroutine def hello(): print('Start Hello', threading.currentThread()) yield from asyncio.sleep(5) print('End Hello', threading.currentThread()) @asyncio.coroutine def world(): print('Start World', threading.currentThread()) yield from asyncio.sleep(3) print('End World', threading.currentThread()) # 获取EventLoop: loop = asyncio.get_event_loop() tasks = [hello(), world()] # 执行coroutine loop.run_until_complete(asyncio.wait(tasks)) loop.close()
asyncio.sleep(3) 建立一個3秒後完成的協程。
loop.run_until_complete(future),執行直到future完成;如果參數是 coroutine object,則需要使用 ensure_future()函式包裝。
loop.close() 關閉事件循環
範例2
#
import asyncio @asyncio.coroutine def worker(text): """ 协程运行的函数 :param text: :return: """ i = 0 while True: print(text, i) try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break i += 1 @asyncio.coroutine def client(text, io_used): worker_fu = asyncio.ensure_future(worker(text)) # 假装等待I/O一段时间 yield from asyncio.sleep(io_used) # 结束运行协程 worker_fu.cancel() return 'done' loop = asyncio.get_event_loop() tasks = [client('xiaozhe', 3), client('zzzz', 5)] result = loop.run_until_complete(asyncio.wait(tasks)) loop.close() print('Answer:', result)
2. worker_fu.cancel(): 取消一個協程的執行,拋出CancelledError例外。
3. asyncio.wait():協程的參數是一個由期物或協程構成的可迭代物件; wait 會分別把各個協程包裝進一個 Task 物件。
asyncio.Task 物件與threading.Thread物件的比較
asyncio.Task 物件差不多與 threading.Thread 物件等效。Task 物件用於驅動協程, Thread 物件用於呼叫可呼叫的物件。
Task 物件不是由自己動手實例化,而是透過把協程傳給 asyncio.ensure_future(…) 函數或loop.create_task(…) 方法來取得。
取得的 Task 物件已經排定了運行時間;Thread 實例則必須呼叫 start 方法,明確告知讓它運行。
如果想要終止任務,可以使用 Task.cancel() 實例方法,在協程內部拋出CancelledError 例外。
執行緒與協程的安全性比較
如果使用執行緒做過重要的編程,因為排程器任何時候都能中斷執行緒。必須記住保留鎖,去保護程式中的重要部分,防止多步驟操作在執行的過程中中斷,防止資料處於無效狀態。 協程預設會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程式的餘下部分運作。對協程來說,無須保留鎖,在多個執行緒之間同步操作,協程本身就會同步,因為在任意時刻只有一個協程運行。想交出控制權時,可以使用 yield 或 yield from 把控制權交還調度程式。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執行清理操作。Future(期物)
通常自己不應該建立期物,而只能由並發框架(concurrent.futures 或 asyncio)實例化。原因很簡單:期物表示終將發生的事情,而決定某件事會發生的唯一方式是執行的時間已經排定。asyncio.Future
在asyncio 套件中, BaseEventLoop.create_task(…) 方法接收一個協程,排定它的運行時間,然後傳回一個asyncio. Task 實例-也是asyncio.Future 類別的實例,因為Task 是Future 的子類,用來包裝協程。 asyncio.ensure_future(coro_or_future, *, loop=None)這個函數統一了協程和期物:第一個參數可以是二者中的任何一個。如果是 Future 或 Task 對象,那就原封不動地回傳。如果是協程,那麼 async 函數會呼叫loop.create_task(…) 方法建立 Task 物件。 loop= 關鍵字參數是可選的,用於傳入事件循環;如果沒有傳入,那麼 async 函數會透過呼叫 asyncio.get_event_loop() 函數來取得循環物件。 BaseEventLoop.create_task(coro)這個方法排定協程的執行時間,傳回一個 asyncio.Task 物件。 asyncio 套件中有多個函式會自動把參數指定的協程包裝在 asyncio.Task 物件中,例如 BaseEventLoop.run_until_complete(…) 方法。asyncio.as_completed#
为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。
使用asyncio和aiohttp包
从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。
cc_list = ['China', 'USA'] @asyncio.coroutine def get_flag(cc): url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) image = yield from resp.read() return image @asyncio.coroutine def download_one(name): image = yield from get_flag(name) save_flag(image, name.lower() + '.gif') return name loop = asyncio.get_event_loop() wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) res, _ = loop.run_until_complete(wait_coro) loop.close()
使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。
避免阻塞型调用
有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用
多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。
把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。
上面的脚本为什么会很快
在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。
asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。
async和await
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。
async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await
例如:
@asyncio.coroutine def hello(): print("Hello world!") r = yield from asyncio.sleep(1) print("Hello again!")
等同于
async def hello(): print("Hello world!") r = await asyncio.sleep(1) print("Hello again!")
网站请求实例
import asyncio import aiohttp urls = [ 'http://www.163.com/', 'http://www.sina.com.cn/', 'https://www.hupu.com/', 'http://www.php.cn/' ] async def get_url_data(u): """ 读取url的数据 :param u: :return: """ print('running ', u) async with aiohttp.ClientSession() as session: async with session.get(u) as resp: print(u, resp.status, type(resp.text())) # print(await resp.text()) return resp.headers async def request_url(u): """ 主调度函数 :param u: :return: """ res = await get_url_data(u) return res loop = asyncio.get_event_loop() task_lists = asyncio.wait([request_url(u) for u in urls]) all_res, _ = loop.run_until_complete(task_lists) loop.close() print(all_res)
以上是詳解Python使用asyncio包處理併發的方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!