讓我們探索 Python 中協程和結構化並發的令人興奮的世界。這些強大的功能徹底改變了我們編寫並發程式碼的方式,使其更有效率且更易於管理。
協程是特殊函數,可以暫停執行並將控制權交給其他協程。它們是使用 async 關鍵字定義的,並且可以使用await 關鍵字等待。這是一個簡單的例子:
async def greet(name): print(f"Hello, {name}!") await asyncio.sleep(1) print(f"Goodbye, {name}!") async def main(): await greet("Alice") await greet("Bob") asyncio.run(main())
在此程式碼中,greet 函數是一個協程,它會列印問候語,等待一秒鐘,然後說再見。 main函式呼叫了兩次greet,我們使用asyncio.run來執行主協程。
但是是什麼讓協程如此特別呢?它們允許我們編寫看起來和行為都像同步程式碼的並發程式碼,但實際上可以同時執行多個操作。這對於 I/O 密集型任務特別有用,例如網路操作或檔案處理。
讓我們更深入地了解 asyncio 函式庫,它為 Python 中的非同步程式設計提供了基礎。其核心是事件循環,它管理協程的執行。您可以將其視為決定接下來要執行哪個協程的調度程序。
以下是我們如何透過 asyncio 建立和使用任務:
import asyncio async def fetch_data(url): print(f"Fetching data from {url}") await asyncio.sleep(2) # Simulating network delay return f"Data from {url}" async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] tasks = [asyncio.create_task(fetch_data(url)) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())
在此範例中,我們模擬同時從多個 URL 取得資料。 asyncio.create_task 函數將我們的協程轉換為任務,然後使用 asyncio.gather 並發執行。
現在,我們來談談結構化並發。這是一個旨在使並發程式碼更可預測、更容易推理的範例。 Python 3.11 引入了一些新功能來支援結構化並發,例如任務組。
以下是我們如何使用任務群組:
import asyncio async def process_item(item): await asyncio.sleep(1) return f"Processed {item}" async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(process_item("A")) task2 = tg.create_task(process_item("B")) task3 = tg.create_task(process_item("C")) print(task1.result()) print(task2.result()) print(task3.result()) asyncio.run(main())
TaskGroup 確保在我們繼續之前完成(或取消)所有任務。這有助於防止忘記任務或並發操作之間意外互動等問題。
協程最強大的方面之一是它們有效處理 I/O 操作的能力。讓我們來看一個簡單的非同步 Web 伺服器的範例:
import asyncio from aiohttp import web async def handle(request): name = request.match_info.get('name', "Anonymous") text = f"Hello, {name}!" return web.Response(text=text) async def main(): app = web.Application() app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() print("Server started at http://localhost:8080") await asyncio.Event().wait() asyncio.run(main())
由於協程的強大功能,此伺服器可以同時處理多個連線。每個請求都在自己的協程中處理,使伺服器即使在高負載下也能保持回應。
讓我們來探索一些更高階的概念。在處理並發操作時,取消是重要的特性。有時我們需要在任務完成之前停止它。我們可以這樣做:
async def greet(name): print(f"Hello, {name}!") await asyncio.sleep(1) print(f"Goodbye, {name}!") async def main(): await greet("Alice") await greet("Bob") asyncio.run(main())
在此範例中,我們建立一個長時間運行的任務並在 5 秒後取消它。該任務捕獲 CancelledError 並在退出之前執行任何必要的清理。
另一個強大的功能是建立自訂事件循環的能力。雖然預設的事件循環足以滿足大多數情況,但有時我們需要更多的控制。這是自訂事件循環的簡單範例:
import asyncio async def fetch_data(url): print(f"Fetching data from {url}") await asyncio.sleep(2) # Simulating network delay return f"Data from {url}" async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] tasks = [asyncio.create_task(fetch_data(url)) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())
這是一個非常基本的自訂事件循環,但它演示了原理。您可以擴展它以添加功能,例如更好的調度、監控或與其他系統的整合。
讓我們討論一下使用協程和結構化並發時的一些最佳實踐。首先,始終使用 async with 來管理非同步上下文管理器。即使發生異常,這也能確保正確的設定和拆卸:
import asyncio async def process_item(item): await asyncio.sleep(1) return f"Processed {item}" async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(process_item("A")) task2 = tg.create_task(process_item("B")) task3 = tg.create_task(process_item("C")) print(task1.result()) print(task2.result()) print(task3.result()) asyncio.run(main())
第二,要小心阻塞操作。如果您需要執行 CPU 密集型任務,請考慮使用 asyncio.to_thread 在單獨的執行緒中執行它:
import asyncio from aiohttp import web async def handle(request): name = request.match_info.get('name', "Anonymous") text = f"Hello, {name}!" return web.Response(text=text) async def main(): app = web.Application() app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() print("Server started at http://localhost:8080") await asyncio.Event().wait() asyncio.run(main())
第三,當您需要對一組任務進行更多控制時,請使用 asyncio.wait。它允許您等待第一個任務完成,或設定超時:
import asyncio async def long_running_task(): try: while True: print("Working...") await asyncio.sleep(1) except asyncio.CancelledError: print("Task was cancelled") async def main(): task = asyncio.create_task(long_running_task()) await asyncio.sleep(5) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
除錯並發程式碼可能具有挑戰性。 Python 的 asyncio 附帶了一些有用的工具。您可以啟用偵錯模式以獲得更詳細的日誌記錄:
import asyncio class MyEventLoop(asyncio.BaseEventLoop): def __init__(self): self._running = False self._ready = asyncio.Queue() def run_forever(self): self._running = True while self._running: coro = self._ready.get_nowait() if coro: coro.send(None) def stop(self): self._running = False def call_soon(self, callback, *args): self._ready.put_nowait(callback(*args)) # Usage loop = MyEventLoop() asyncio.set_event_loop(loop) async def my_coroutine(): print("Hello from my coroutine!") loop.call_soon(my_coroutine) loop.run_forever()
您也可以使用 aiodebug 函式庫來實現更進階的偵錯功能。
讓我們來看一個更複雜的範例:並行資料處理管道。這對於處理大型資料集或處理流資料等任務非常有用:
async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as response: html = await response.text()
此管道示範了我們如何使用佇列在不同處理階段之間傳遞數據,所有階段都同時運行。
協程和結構化並發為 Python 程式設計開啟了新的可能性。它們使我們能夠編寫更易於推理和維護的高效並發程式碼。無論您是建立 Web 伺服器、資料處理管道還是響應式 GUI,這些工具都可以幫助您建立強大的高效能應用程式。
記住,掌握這些概念的關鍵是練習。從簡單的範例開始,逐漸建立更複雜的用例。注意錯誤處理和取消,因為這些對於建立可靠的非同步系統至關重要。不要害怕深入研究 asyncio 原始碼 - 這是加深您對這些強大功能在幕後如何運作的理解的好方法。
當您繼續探索協程和結構化並發時,您將發現新的模式和技術,可以使您的程式碼更有效率和更有表現力。這是 Python 開發中一個令人興奮的領域,也是一個不斷發展的領域。因此,繼續學習,繼續嘗試,享受非同步程式設計世界的旅程!
一定要看看我們的創作:
投資者中心 | 智能生活 | 時代與迴聲 | 令人費解的謎團 | 印度教 | 精英開發 | JS學校
科技無尾熊洞察 | 時代與迴響世界 | 投資人中央媒體 | 令人費解的謎團 | | 令人費解的謎團 | >科學與時代媒介 |
現代印度教以上是掌握 Python 協程:提高程式碼效率和效能的詳細內容。更多資訊請關注PHP中文網其他相關文章!