Python 3.5 引入了异步 I/O 作为线程的替代方案来处理并发。异步 I/O 和 Python 中的 asyncio 实现的优势在于,通过不产生内存消耗大的操作系统线程,系统使用更少的资源并且更具可扩展性。此外,在 asyncio 中,调度点通过 await
语法明确定义,而在基于线程的并发中,GIL 可能会在难以预测的代码点释放。因此,基于 asyncio 的并发系统更容易理解和调试。最终,可以取消 asyncio 任务,而这在使用线程时不容易做到。
但是,为了真正受益于这些优势,在异步协程中避免阻塞调用非常重要。阻塞调用可以是网络调用、文件系统调用、sleep
调用等等。这些阻塞调用是有害的,因为在底层,asyncio 使用单线程事件循环来并发运行协程。因此,如果在协程中进行阻塞调用,它会阻塞整个事件循环和所有协程,从而影响应用程序的整体性能。
以下是一个阻塞调用阻止代码并发执行的示例:
import asyncio import datetime import time async def example(name): print(f"{datetime.datetime.now()}: {name} start") time.sleep(1) # time.sleep 是一个阻塞函数 print(f"{datetime.datetime.now()}: {name} stop") async def main(): await asyncio.gather(example("1"), example("2")) asyncio.run(main())
运行结果类似于:
<code>2025-01-07 18:50:15.327677: 1 start 2025-01-07 18:50:16.328330: 1 stop 2025-01-07 18:50:16.328404: 2 start 2025-01-07 18:50:17.333159: 2 stop</code>
可以看到,两个协程没有并发运行。
为了克服这个问题,你需要使用非阻塞等效项或将执行推迟到线程池:
import asyncio import datetime import time async def example(name): print(f"{datetime.datetime.now()}: {name} start") await asyncio.sleep(1) # 将阻塞的 time.sleep 调用替换为非阻塞的 asyncio.sleep 协程 print(f"{datetime.datetime.now()}: {name} stop") async def main(): await asyncio.gather(example("1"), example("2")) asyncio.run(main())
运行结果类似于:
<code>2025-01-07 18:53:53.579738: 1 start 2025-01-07 18:53:53.579797: 2 start 2025-01-07 18:53:54.580463: 1 stop 2025-01-07 18:53:54.580572: 2 stop</code>
这里两个协程并发运行。
现在的问题是,并不总是很容易识别一个方法是否阻塞。特别是如果代码库很大或使用第三方库。有时,阻塞调用是在代码的深层部分进行的。
例如,这段代码是否阻塞?
import blockbuster from importlib.metadata import version async def get_version(): return version("blockbuster")
Python 是否在启动时将包元数据加载到内存中?是在加载 blockbuster
模块时完成的吗?或者在我们调用 version()
时?结果是否被缓存,后续调用将是非阻塞的吗?正确答案是在调用 version()
时完成的,它涉及读取已安装包的 METADATA 文件。并且结果没有被缓存。因此,version()
是一个阻塞调用,应该始终推迟到线程中。如果不深入研究 importlib
的代码,很难知道这个事实。
检测阻塞调用的一种方法是激活 asyncio 的调试模式来记录耗时过长的阻塞调用。但这并不是最有效的方法,因为许多短于触发超时时间的阻塞仍然会损害性能,并且测试/开发中的阻塞时间可能与生产环境中的不同。例如,如果数据库必须获取大量数据,则数据库调用在生产环境中可能需要更长时间。
这就是 BlockBuster 发挥作用的地方!激活后,BlockBuster 将修补几个阻塞的 Python 框架方法,如果它们从 asyncio 事件循环调用,则会引发错误。默认修补的方法包括 os
、io
、time
、socket
、sqlite
模块的方法。有关 BlockBuster 检测到的方法的完整列表,请参阅项目自述文件。然后,你可以在单元测试或开发模式中激活 BlockBuster 来捕获任何阻塞调用并修复它们。如果你知道 JVM 中很棒的 BlockHound 库,它的原理相同,但适用于 Python。BlockHound 是 BlockBuster 的一个很好的灵感来源,感谢创建者。
让我们看看如何在上面阻塞代码片段上使用 BlockBuster。
首先,我们需要安装 blockbuster
包
import asyncio import datetime import time async def example(name): print(f"{datetime.datetime.now()}: {name} start") time.sleep(1) # time.sleep 是一个阻塞函数 print(f"{datetime.datetime.now()}: {name} stop") async def main(): await asyncio.gather(example("1"), example("2")) asyncio.run(main())
然后,我们可以使用 pytest fixture 和 blockbuster_ctx()
方法来在每个测试开始时激活 BlockBuster,并在拆卸期间停用它。
<code>2025-01-07 18:50:15.327677: 1 start 2025-01-07 18:50:16.328330: 1 stop 2025-01-07 18:50:16.328404: 2 start 2025-01-07 18:50:17.333159: 2 stop</code>
如果你用 pytest 运行这个,你会得到
import asyncio import datetime import time async def example(name): print(f"{datetime.datetime.now()}: {name} start") await asyncio.sleep(1) # 将阻塞的 time.sleep 调用替换为非阻塞的 asyncio.sleep 协程 print(f"{datetime.datetime.now()}: {name} stop") async def main(): await asyncio.gather(example("1"), example("2")) asyncio.run(main())
注意: 通常,在一个真实的项目中,
blockbuster()
fixture 将在一个conftest.py
文件中设置。
我相信 BlockBuster 在 asyncio 项目中非常有用。它已经帮助我在我参与的项目中检测到许多阻塞调用问题。但这并不是灵丹妙药。特别是,一些第三方库不使用 Python 框架方法来与网络或文件系统交互,而是包装 C 库。对于这些库,可以在测试设置中添加规则来触发这些库的阻塞调用。BlockBuster 也是开源的:非常欢迎贡献,以便在核心项目中为你的最喜欢的库添加规则。如果你看到问题和可以改进的地方,我很乐意在项目问题跟踪器中收到你的反馈。
一些链接:
以上是BlockBuster 简介:我的异步事件循环被阻止了吗?的详细内容。更多信息请关注PHP中文网其他相关文章!