掌握 Python 协程:提高代码效率和性能

Mary-Kate Olsen
发布: 2024-11-23 09:21:22
原创
831 人浏览过

Mastering Python

让我们探索 Python 中协程和结构化并发的令人兴奋的世界。这些强大的功能彻底改变了我们编写并发代码的方式,使其更加高效且更易于管理。

协程是特殊函数,可以暂停执行并将控制权交给其他协程。它们是使用 async 关键字定义的,并且可以使用await 关键字等待。这是一个简单的例子:

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

async def main():
    await greet("Alice")
    await greet("Bob")

asyncio.run(main())
登录后复制
登录后复制

在此代码中,greet 函数是一个协程,它打印问候语,等待一秒钟,然后说再见。 main函数调用了两次greet,我们使用asyncio.run来执行主协程。

但是是什么让协程如此特别呢?它们允许我们编写看起来和行为都像同步代码的并发代码,但实际上可以同时执行多个操作。这对于 I/O 密集型任务特别有用,例如网络操作或文件处理。

让我们更深入地了解 asyncio 库,它为 Python 中的异步编程提供了基础。其核心是事件循环,它管理协程的执行。您可以将其视为决定接下来运行哪个协程的调度程序。

以下是我们如何通过 asyncio 创建和使用任务:

import asyncio

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # Simulating network delay
    return f"Data from {url}"

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())
登录后复制
登录后复制

在此示例中,我们模拟同时从多个 URL 获取数据。 asyncio.create_task 函数将我们的协程转换为任务,然后使用 asyncio.gather 并发执行。

现在,我们来谈谈结构化并发。这是一个旨在使并发代码更可预测、更容易推理的范例。 Python 3.11 引入了一些新功能来支持结构化并发,例如任务组。

以下是我们如何使用任务组:

import asyncio

async def process_item(item):
    await asyncio.sleep(1)
    return f"Processed {item}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(process_item("A"))
        task2 = tg.create_task(process_item("B"))
        task3 = tg.create_task(process_item("C"))

    print(task1.result())
    print(task2.result())
    print(task3.result())

asyncio.run(main())
登录后复制
登录后复制

TaskGroup 确保在我们继续之前完成(或取消)所有任务。这有助于防止忘记任务或并发操作之间意外交互等问题。

协程最强大的方面之一是它们有效处理 I/O 操作的能力。让我们看一个简单的异步 Web 服务器的示例:

import asyncio
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = f"Hello, {name}!"
    return web.Response(text=text)

async def main():
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    print("Server started at http://localhost:8080")
    await asyncio.Event().wait()

asyncio.run(main())
登录后复制
登录后复制

由于协程的强大功能,该服务器可以同时处理多个连接。每个请求都在自己的协程中处理,使服务器即使在高负载下也能保持响应。

让我们探索一些更高级的概念。在处理并发操作时,取消是一个重要的特性。有时我们需要在任务完成之前停止它。我们可以这样做:

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

async def main():
    await greet("Alice")
    await greet("Bob")

asyncio.run(main())
登录后复制
登录后复制

在此示例中,我们创建一个长时间运行的任务并在 5 秒后取消它。该任务捕获 CancelledError 并在退出之前执行任何必要的清理。

另一个强大的功能是创建自定义事件循环的能力。虽然默认的事件循环足以满足大多数情况,但有时我们需要更多的控制。这是自定义事件循环的简单示例:

import asyncio

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # Simulating network delay
    return f"Data from {url}"

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())
登录后复制
登录后复制

这是一个非常基本的自定义事件循环,但它演示了原理。您可以扩展它以添加功能,例如更好的调度、监控或与其他系统的集成。

让我们讨论一下使用协程和结构化并发时的一些最佳实践。首先,始终使用 async with 来管理异步上下文管理器。即使发生异常,这也能确保正确的设置和拆卸:

import asyncio

async def process_item(item):
    await asyncio.sleep(1)
    return f"Processed {item}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(process_item("A"))
        task2 = tg.create_task(process_item("B"))
        task3 = tg.create_task(process_item("C"))

    print(task1.result())
    print(task2.result())
    print(task3.result())

asyncio.run(main())
登录后复制
登录后复制

第二,要小心阻塞操作。如果您需要执行 CPU 密集型任务,请考虑使用 asyncio.to_thread 在单独的线程中运行它:

import asyncio
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = f"Hello, {name}!"
    return web.Response(text=text)

async def main():
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    print("Server started at http://localhost:8080")
    await asyncio.Event().wait()

asyncio.run(main())
登录后复制
登录后复制

第三,当您需要对一组任务进行更多控制时,请使用 asyncio.wait。它允许您等待第一个任务完成,或设置超时:

import asyncio

async def long_running_task():
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(5)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())
登录后复制

调试并发代码可能具有挑战性。 Python 的 asyncio 附带了一些有用的工具。您可以启用调试模式以获得更详细的日志记录:

import asyncio

class MyEventLoop(asyncio.BaseEventLoop):
    def __init__(self):
        self._running = False
        self._ready = asyncio.Queue()

    def run_forever(self):
        self._running = True
        while self._running:
            coro = self._ready.get_nowait()
            if coro:
                coro.send(None)

    def stop(self):
        self._running = False

    def call_soon(self, callback, *args):
        self._ready.put_nowait(callback(*args))

# Usage
loop = MyEventLoop()
asyncio.set_event_loop(loop)

async def my_coroutine():
    print("Hello from my coroutine!")

loop.call_soon(my_coroutine)
loop.run_forever()
登录后复制

您还可以使用 aiodebug 库来实现更高级的调试功能。

让我们看一个更复杂的示例:并行数据处理管道。这对于处理大型数据集或处理流数据等任务非常有用:

async with aiohttp.ClientSession() as session:
    async with session.get('http://example.com') as response:
        html = await response.text()
登录后复制

此管道演示了我们如何使用队列在不同处理阶段之间传递数据,所有阶段都同时运行。

协程和结构化并发为 Python 编程开辟了新的可能性。它们使我们能够编写更易于推理和维护的高效并发代码。无论您是构建 Web 服务器、数据处理管道还是响应式 GUI,这些工具都可以帮助您创建强大的高性能应用程序。

记住,掌握这些概念的关键是练习。从简单的示例开始,逐渐构建更复杂的用例。注意错误处理和取消,因为这些对于构建可靠的异步系统至关重要。不要害怕深入研究 asyncio 源代码 - 这是加深您对这些强大功能在幕后如何工作的理解的好方法。

当您继续探索协程和结构化并发时,您将发现新的模式和技术,可以使您的代码更加高效和富有表现力。这是 Python 开发中一个令人兴奋的领域,并且是一个不断发展的领域。因此,继续学习,继续尝试,享受异步编程世界的旅程!


我们的创作

一定要看看我们的创作:

投资者中心 | 智能生活 | 时代与回声 | 令人费解的谜团 | 印度教 | 精英开发 | JS学校


我们在媒体上

科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教

以上是掌握 Python 协程:提高代码效率和性能的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板