目录
在协程函数中调用同步函数
在同步函数中调用异步函数
asyncio.run
asyncio.run_coroutine_threadsafe
loop.run_until_complete
create_task
首页 后端开发 Python教程 Python混合怎么使用同步和异步函数

Python混合怎么使用同步和异步函数

May 12, 2023 am 09:58 AM
python

    在协程函数中调用同步函数

    在协程函数中直接调用同步函数会阻塞事件循环,从而影响整个程序的性能。我们先来看一个例子:

    以下是使用异步 Web 框架 FastAPI 写的一个例子,FastAPI 是比较快,但不正确的操作将会变得很慢。

    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        time.sleep(10)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}
    登录后复制

    上面我们写了两个接口,假设 root 接口函数耗时 10 秒,在这 10 秒内访问 health 接口,想一想会发生什么?

    Python混合怎么使用同步和异步函数

    访问 root 接口(左),立即访问 health 接口(右),health 接口被阻塞,直至 root 接口返回后,health 接口才成功响应。

    time.sleep 就是一个「同步」函数,它会阻塞整个事件循环。

    如何解决呢?想一想以前的处理方法,如果一个函数会阻塞主线程,那么就再开一个线程让这个阻塞函数单独运行。所以,这里也是同理,开一个线程单独去运行那些阻塞式操作,比如读取文件等。

    loop.run_in_executor 方法将同步函数转换为异步非阻塞方式进行处理。具体来说,loop.run_in_executor() 可以将同步函数创建为一个线程进程,并在其中执行该函数,从而避免阻塞事件循环。

    官方例子:在线程或者进程池中执行代码。

    那么,我们使用 loop.run_in_executor 改写上面例子,如下:

    import asyncio
    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        loop = asyncio.get_event_loop()
    
        def do_blocking_work():
            time.sleep(10)
            print("Done blocking work!!")
    
        await loop.run_in_executor(None, do_blocking_work)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}
    登录后复制

    效果如下:

    Python混合怎么使用同步和异步函数

    root 接口被阻塞期间,health 依然正常访问互不影响。

    注意: 这里都是为了演示,实际在使用 FastAPI 开发时,你可以直接将 async def root 更换成 def root ,也就是将其换成同步接口函数,FastAPI 内部会自动创建线程处理这个同步接口函数。总的来说,FastAPI 内部也是依靠线程去处理同步函数从而避免阻塞主线程(或主线程中的事件循环)。

    在同步函数中调用异步函数

    协程只能在「事件循环」内被执行,且同一时刻只能有一个协程被执行。

    所以,在同步函数中调用异步函数,其本质就是将协程「扔进」事件循环中,等待该协程执行完获取结果即可。

    以下这些函数,都可以实现这个效果:

    • asyncio.run

    • asyncio.run_coroutine_threadsafe

    • loop.run_until_complete

    • create_task

    接下来,我们将一一讲解这些方法并举例说明。

    asyncio.run

    这个方法使用起来最简单,先看下如何使用,然后紧跟着讲一下哪些场景不能直接使用 asyncio.run

    import asyncio
    
    async def do_work():
        return 1
    
    def main():
        result = asyncio.run(do_work())
        print(result)  # 1
    
    if __name__ == "__main__":
        main()
    登录后复制

    直接 run 就完事了,然后接受返回值即可。

    但是需要,注意的是 asyncio.run 每次调用都会新开一个事件循环,当结束时自动关闭该事件循环。

    一个线程内只存在一个事件循环,所以如果当前线程已经有存在的事件循环了,就不应该使用 asyncio.run 了,否则就会抛出如下异常:

    RuntimeError: asyncio.run() cannot be called from a running event loop

    因此,asyncio.run 用作新开一个事件循环时使用。

    asyncio.run_coroutine_threadsafe

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

    向指定事件循环提交一个协程。(线程安全)
    返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

    换句话说,就是将协程丢给其他线程中的事件循环去运行

    值得注意的是这里的「事件循环」应该是其他线程中的事件循环,非当前线程的事件循环。

    其返回的结果是一个 future 对象,如果你需要获取协程的执行结果可以使用 future.result() 获取,关于 future 对象的更多介绍,见 https://docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future

    下方给了一个例子,一共有两个线程:thread_with_loopanother_thread,分别用于启动事件循环和调用 run_coroutine_threadsafe

    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 将协程提交到另一个线程的事件循环中执行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待协程执行结果
        print(future.result())
        # 停止事件循环
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
        loop.run_forever()
        loop.close()
    
    
    # 启动一个线程,线程内部启动了一个事件循环
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()
    登录后复制

    loop.run_until_complete

    文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete

    运行直到 future ( Future 的实例 ) 被完成。

    这个方法和 asyncio.run 类似。

    具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。

    run_until_complete 属于 loop 对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:

    RuntimeError: This event loop is already running

    例子:

    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())
    登录后复制

    create_task

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks

    再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。

    其实一共就是满足两个条件:

    • 任务;

    • 事件循环。

    我们使用 async def func 定义的函数叫做协程函数func() 这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。

    所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await 这些语句时没有涉及到任务这个概念呢?

    这是因为 await 语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务

    所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。

    那将协程封装成的任务的方法有哪些呢?

    • asyncio.create_task

    • asyncio.ensure_future

    • loop.create_task

    看着有好几个的,没关系,我们只关心 loop.create_task,因为其他方法最终都是调用 loop.create_task

    使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。

    async def do_work():
        return 222
    
    task = loop.create_task(do_work())
    登录后复制

    do_work 会被异步执行,那么 do_work 的结果怎么获取呢,task.result() 可以吗?

    分情况:

    • 如果是在一个协程函数内使用 await task.result(),这是可以的;

    • 如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。

    asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。

    我这里给个基于 concurrent.futures.Future 获取结果的例子,如下:

    import asyncio
    from asyncio import Task
    from concurrent.futures import Future
    
    from fastapi import FastAPI
    
    app = FastAPI()
    loop = asyncio.get_event_loop()
    
    
    async def do_work1():
        return 222
    
    
    @app.get("/")
    def root():
        # 新建一个 future 对象,用于接受结果值
        future = Future()
    
        # 提交任务至事件循环
        task = loop.create_task(do_work1())
    
        # 回调函数
        def done_callback(task: Task):
            # 设置结果
            future.set_result(task.result())
    
        # 为这个任务添加回调函数
        task.add_done_callback(done_callback)
    
        # future.result 会被阻塞,直到有结果返回为止
        return future.result()  # 222
    登录后复制

    以上是Python混合怎么使用同步和异步函数的详细内容。更多信息请关注PHP中文网其他相关文章!

    本站声明
    本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

    热AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智能驱动的应用程序,用于创建逼真的裸体照片

    AI Clothes Remover

    AI Clothes Remover

    用于从照片中去除衣服的在线人工智能工具。

    Undress AI Tool

    Undress AI Tool

    免费脱衣服图片

    Clothoff.io

    Clothoff.io

    AI脱衣机

    AI Hentai Generator

    AI Hentai Generator

    免费生成ai无尽的。

    热门文章

    R.E.P.O.能量晶体解释及其做什么(黄色晶体)
    4 周前 By 尊渡假赌尊渡假赌尊渡假赌
    R.E.P.O.最佳图形设置
    4 周前 By 尊渡假赌尊渡假赌尊渡假赌
    R.E.P.O.如果您听不到任何人,如何修复音频
    4 周前 By 尊渡假赌尊渡假赌尊渡假赌
    WWE 2K25:如何解锁Myrise中的所有内容
    1 个月前 By 尊渡假赌尊渡假赌尊渡假赌

    热工具

    记事本++7.3.1

    记事本++7.3.1

    好用且免费的代码编辑器

    SublimeText3汉化版

    SublimeText3汉化版

    中文版,非常好用

    禅工作室 13.0.1

    禅工作室 13.0.1

    功能强大的PHP集成开发环境

    Dreamweaver CS6

    Dreamweaver CS6

    视觉化网页开发工具

    SublimeText3 Mac版

    SublimeText3 Mac版

    神级代码编辑软件(SublimeText3)

    2小时的Python计划:一种现实的方法 2小时的Python计划:一种现实的方法 Apr 11, 2025 am 12:04 AM

    2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

    redis怎么读取队列 redis怎么读取队列 Apr 10, 2025 pm 10:12 PM

    要从 Redis 读取队列,需要获取队列名称、使用 LPOP 命令读取元素,并处理空队列。具体步骤如下:获取队列名称:以 "queue:" 前缀命名,如 "queue:my-queue"。使用 LPOP 命令:从队列头部弹出元素并返回其值,如 LPOP queue:my-queue。处理空队列:如果队列为空,LPOP 返回 nil,可先检查队列是否存在再读取元素。

    Redis如何查看服务器版本 Redis如何查看服务器版本 Apr 10, 2025 pm 01:27 PM

    问题:如何查看 Redis 服务器版本?使用命令行工具 redis-cli --version 查看已连接服务器的版本。使用 INFO server 命令查看服务器内部版本,需解析返回信息。在集群环境下,检查每个节点的版本一致性,可使用脚本自动化检查。使用脚本自动化查看版本,例如用 Python 脚本连接并打印版本信息。

    redis怎么启动服务器 redis怎么启动服务器 Apr 10, 2025 pm 08:12 PM

    启动 Redis 服务器的步骤包括:根据操作系统安装 Redis。通过 redis-server(Linux/macOS)或 redis-server.exe(Windows)启动 Redis 服务。使用 redis-cli ping(Linux/macOS)或 redis-cli.exe ping(Windows)命令检查服务状态。使用 Redis 客户端,如 redis-cli、Python 或 Node.js,访问服务器。

    如何根据业务需求设置Redis内存大小? 如何根据业务需求设置Redis内存大小? Apr 10, 2025 pm 02:18 PM

    Redis 内存大小设置需要考虑以下因素:数据量及增长趋势:估算存储数据的大小和增长率。数据类型:不同类型(如列表、哈希)占用内存不同。缓存策略:全缓存、部分缓存和淘汰策略会影响内存使用。业务峰值:预留足够内存应对流量高峰。

    Redis内存配置参数有哪些? Redis内存配置参数有哪些? Apr 10, 2025 pm 02:03 PM

    **Redis内存配置的核心参数是 maxmemory,它限制 Redis 可使用内存量。当超过此限制时,Redis 根据 maxmemory-policy 执行淘汰策略,有:noeviction(直接拒绝写入)、allkeys-lru/volatile-lru(按LRU淘汰)、allkeys-random/volatile-random(随机淘汰)、volatile-ttl(按过期时间淘汰)。其他相关参数包括 maxmemory-samples(LRU采样数量)、rdb-compression

    Redis持久化对内存的影响是什么? Redis持久化对内存的影响是什么? Apr 10, 2025 pm 02:15 PM

    Redis持久化会额外占用内存,RDB在生成快照时临时增加内存占用,AOF在追加日志时持续占用内存。影响因素包括数据量、持久化策略和Redis配置。要减轻影响,可合理配置RDB快照策略、优化AOF配置、升级硬件和监控内存使用情况。此外,在性能和数据安全之间寻求平衡至关重要。

    Python vs.C:申请和用例 Python vs.C:申请和用例 Apr 12, 2025 am 12:01 AM

    Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。 Python以简洁和强大的生态系统着称,C 则以高性能和底层控制能力闻名。

    See all articles