Python 中的非同步上下文管理器是處理並發應用程式中資源的遊戲規則改變者。它們就像常規的上下文管理器,但有一點不同 - 它們可以與非同步程式碼無縫協作。
讓我們從基礎開始。要建立非同步上下文管理器,我們需要實作兩個特殊方法:__aenter__ 和 __aexit__。這些是我們在常規上下文管理器中使用的 __enter__ 和 __exit__ 的非同步版本。
這是一個簡單的例子:
class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(1) # Simulating async acquisition return self async def __aexit__(self, exc_type, exc_value, traceback): print("Releasing resource") await asyncio.sleep(1) # Simulating async release async def main(): async with AsyncResource() as resource: print("Using resource") asyncio.run(main())
在此範例中,我們模擬資源的非同步獲取和釋放。 async with 語句負責在正確的時間呼叫 __aenter__ 和 __aexit__。
現在,我們來談談為什麼非同步上下文管理器如此有用。它們非常適合以非阻塞方式管理需要非同步操作的資源,例如資料庫連接、網路套接字或檔案處理程序。
以資料庫連線為例。我們可以建立一個管理連線池的非同步上下文管理器:
import asyncpg class DatabasePool: def __init__(self, dsn): self.dsn = dsn self.pool = None async def __aenter__(self): self.pool = await asyncpg.create_pool(self.dsn) return self.pool async def __aexit__(self, exc_type, exc_value, traceback): await self.pool.close() async def main(): async with DatabasePool('postgresql://user:password@localhost/db') as pool: async with pool.acquire() as conn: result = await conn.fetch('SELECT * FROM users') print(result) asyncio.run(main())
此設定可確保我們有效管理資料庫連線。池在我們進入上下文時創建,並在我們退出時正確關閉。
非同步上下文管理器中的錯誤處理與常規錯誤處理類似。如果上下文中發生錯誤,則 __aexit__ 方法會接收異常訊息。我們可以處理這些錯誤或讓它們傳播:
class ErrorHandlingResource: async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): if exc_type is ValueError: print("Caught ValueError, suppressing") return True # Suppress the exception return False # Let other exceptions propagate async def main(): async with ErrorHandlingResource(): raise ValueError("Oops!") print("This will be printed") async with ErrorHandlingResource(): raise RuntimeError("Unhandled!") print("This won't be printed") asyncio.run(main())
在此範例中,我們抑制 ValueError 但允許其他異常傳播。
非同步上下文管理器也非常適合實現分散式鎖定。這是一個使用 Redis 的簡單範例:
import aioredis class DistributedLock: def __init__(self, redis, lock_name, expire=10): self.redis = redis self.lock_name = lock_name self.expire = expire async def __aenter__(self): while True: locked = await self.redis.set(self.lock_name, "1", expire=self.expire, nx=True) if locked: return self await asyncio.sleep(0.1) async def __aexit__(self, exc_type, exc_value, traceback): await self.redis.delete(self.lock_name) async def main(): redis = await aioredis.create_redis_pool('redis://localhost') async with DistributedLock(redis, "my_lock"): print("Critical section") await redis.close() asyncio.run(main())
此鎖定確保一次只有一個進程可以執行關鍵部分,即使跨多台機器也是如此。
我們也可以將非同步上下文管理器用於交易範圍:
class AsyncTransaction: def __init__(self, conn): self.conn = conn async def __aenter__(self): await self.conn.execute('BEGIN') return self async def __aexit__(self, exc_type, exc_value, traceback): if exc_type is None: await self.conn.execute('COMMIT') else: await self.conn.execute('ROLLBACK') async def transfer_funds(from_account, to_account, amount): async with AsyncTransaction(conn): await conn.execute('UPDATE accounts SET balance = balance - WHERE id = ', amount, from_account) await conn.execute('UPDATE accounts SET balance = balance + WHERE id = ', amount, to_account)
此設定可確保我們的資料庫事務始終正確提交或回滾,即使面對異常也是如此。
非同步上下文管理器可以與其他非同步原語結合使用,以獲得更強大的模式。例如,我們可以將它們與 asyncio.gather 一起使用來進行平行資源管理:
async def process_data(data): async with ResourceManager() as rm: results = await asyncio.gather( rm.process(data[0]), rm.process(data[1]), rm.process(data[2]) ) return results
這使我們能夠並行處理多個數據,同時仍確保適當的資源管理。
總之,非同步上下文管理器是管理非同步 Python 程式碼中資源的強大工具。它們提供了一種乾淨、直觀的方式來處理非同步設定和拆卸、錯誤處理和資源清理。透過掌握非同步上下文管理器,您將能夠建立強大的、可擴展的 Python 應用程序,這些應用程式可以輕鬆處理複雜的並發工作流程。
一定要看看我們的創作:
投資者中心 | 智能生活 | 時代與迴響 | 令人費解的謎團 | 印度教 | 精英開發 | JS學校
科技無尾熊洞察 | 時代與迴響世界 | 投資人中央媒體 | 令人費解的謎團 | | 令人費解的謎團 | >科學與時代媒介 |
現代印度教以上是掌握非同步上下文管理器:提高 Python 程式碼的效能的詳細內容。更多資訊請關注PHP中文網其他相關文章!