Table des matières
Fonction de synchronisation des appels dans la fonction coroutine
在同步函数中调用异步函数
asyncio.run
asyncio.run_coroutine_threadsafe
loop.run_until_complete
create_task
Maison développement back-end Tutoriel Python Comment mélanger des fonctions synchrones et asynchrones en Python

Comment mélanger des fonctions synchrones et asynchrones en Python

May 12, 2023 am 09:58 AM
python

    Fonction de synchronisation des appels dans la fonction coroutine

    La fonction de synchronisation des appels directement dans la fonction coroutine bloquera la boucle d'événements, affectant ainsi les performances de l'ensemble du programme. Regardons d'abord un exemple :

    Ce qui suit est un exemple écrit à l'aide du framework Web asynchrone FastAPI. FastAPI est relativement rapide, mais les opérations incorrectes deviendront très lentes.

    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        time.sleep(10)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}
    Copier après la connexion

    Nous avons écrit deux interfaces ci-dessus. Supposons que la fonction d'interface root prenne 10 secondes. Si vous accédez à l'interface health dans ces 10 secondes, pensez à ce qui se passera ? root 接口函数耗时 10 秒,在这 10 秒内访问 health 接口,想一想会发生什么?

    Comment mélanger des fonctions synchrones et asynchrones en Python

    访问 root 接口(左),立即访问 health 接口(右),health 接口被阻塞,直至 root 接口返回后,health 接口才成功响应。

    time.sleep 就是一个「同步」函数,它会阻塞整个事件循环。

    如何解决呢?想一想以前的处理方法,如果一个函数会阻塞主线程,那么就再开一个线程让这个阻塞函数单独运行。所以,这里也是同理,开一个线程单独去运行那些阻塞式操作,比如读取文件等。

    loop.run_in_executor 方法将同步函数转换为异步非阻塞方式进行处理。具体来说,loop.run_in_executor() 可以将同步函数创建为一个线程进程,并在其中执行该函数,从而避免阻塞事件循环。

    官方例子:在线程或者进程池中执行代码。

    那么,我们使用 loop.run_in_executor 改写上面例子,如下:

    import asyncio
    import time
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        loop = asyncio.get_event_loop()
    
        def do_blocking_work():
            time.sleep(10)
            print("Done blocking work!!")
    
        await loop.run_in_executor(None, do_blocking_work)
        return {"message": "Hello World"}
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}
    Copier après la connexion

    效果如下:

    Comment mélanger des fonctions synchrones et asynchrones en Python

    root 接口被阻塞期间,health 依然正常访问互不影响。

    注意: 这里都是为了演示,实际在使用 FastAPI 开发时,你可以直接将 async def root 更换成 def root ,也就是将其换成同步接口函数,FastAPI 内部会自动创建线程处理这个同步接口函数。总的来说,FastAPI 内部也是依靠线程去处理同步函数从而避免阻塞主线程(或主线程中的事件循环)。

    在同步函数中调用异步函数

    协程只能在「事件循环」内被执行,且同一时刻只能有一个协程被执行。

    所以,在同步函数中调用异步函数,其本质就是将协程「扔进」事件循环中,等待该协程执行完获取结果即可。

    以下这些函数,都可以实现这个效果:

    • asyncio.run

    • asyncio.run_coroutine_threadsafe

    • loop.run_until_complete

    • create_task

    接下来,我们将一一讲解这些方法并举例说明。

    asyncio.run

    这个方法使用起来最简单,先看下如何使用,然后紧跟着讲一下哪些场景不能直接使用 asyncio.run

    import asyncio
    
    async def do_work():
        return 1
    
    def main():
        result = asyncio.run(do_work())
        print(result)  # 1
    
    if __name__ == "__main__":
        main()
    Copier après la connexion

    直接 run 就完事了,然后接受返回值即可。

    但是需要,注意的是 asyncio.run 每次调用都会新开一个事件循环,当结束时自动关闭该事件循环。

    一个线程内只存在一个事件循环,所以如果当前线程已经有存在的事件循环了,就不应该使用 asyncio.run 了,否则就会抛出如下异常:

    RuntimeError: asyncio.run() cannot be called from a running event loop

    因此,asyncio.run 用作新开一个事件循环时使用。

    asyncio.run_coroutine_threadsafe

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

    向指定事件循环提交一个协程。(线程安全)
    返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

    换句话说,就是将协程丢给其他线程中的事件循环去运行

    值得注意的是这里的「事件循环」应该是其他线程中的事件循环,非当前线程的事件循环。

    其返回的结果是一个 future 对象,如果你需要获取协程的执行结果可以使用 future.result()

    Comment utiliser les fonctions synchrones et asynchrones en Python🎜🎜Visite< code >root (à gauche), accédez immédiatement à l'interface health (à droite), l'interface health est bloquée jusqu'à ce que la root l'interface renvoie , l'interface health répond avec succès. 🎜🎜time.sleep est une fonction de "synchronisation" qui bloque toute la boucle des événements. 🎜🎜Comment le résoudre ? Pensez à la méthode de traitement précédente. Si une fonction bloque le thread principal, ouvrez un autre thread pour laisser la fonction de blocage s'exécuter seule. Par conséquent, le même principe s'applique ici. Ouvrez un thread pour exécuter des opérations de blocage séparément, telles que la lecture de fichiers, etc. La méthode 🎜🎜loop.run_in_executor convertit les fonctions synchrones en méthodes asynchrones non bloquantes pour le traitement. Plus précisément, loop.run_in_executor() peut créer une fonction de synchronisation en tant que un thread ou un processus et y exécuter la fonction pour éviter de bloquer la boucle d'événements. . 🎜
    🎜Exemple officiel : exécuter du code dans un pool de threads ou de processus. 🎜
    🎜Ensuite, nous utilisons loop.run_in_executor pour réécrire l'exemple ci-dessus comme suit : 🎜
    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 将协程提交到另一个线程的事件循环中执行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待协程执行结果
        print(future.result())
        # 停止事件循环
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
        loop.run_forever()
        loop.close()
    
    
    # 启动一个线程,线程内部启动了一个事件循环
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()
    Copier après la connexion
    Copier après la connexion
    🎜L'effet est le suivant : 🎜🎜Comment utiliser les fonctions synchrones et asynchrones en Python🎜🎜root Alors que l'interface est bloquée, < code>health L'accès normal ne s'affecte toujours pas. 🎜
    🎜Remarque : Tout ceci est à titre de démonstration. Dans le développement réel utilisant FastAPI, vous pouvez directement remplacer async def root par def root</ code>. , c'est-à-dire le remplacer par une fonction d'interface synchrone. FastAPI créera automatiquement un thread en interne pour traiter cette fonction d'interface synchrone. En général, FastAPI s'appuie en interne sur des threads pour gérer les fonctions de synchronisation afin d'éviter de bloquer le thread principal (ou la boucle d'événements dans le thread principal). 🎜</blockquote>🎜Appelez des fonctions asynchrones dans des fonctions synchrones🎜🎜Les coroutines ne peuvent être exécutées que dans la "boucle d'événement", et une seule coroutine peut être exécutée en même temps. 🎜🎜Ainsi, l'essence de l'appel d'une fonction asynchrone dans une fonction synchrone est de "lancer" la coroutine dans la boucle d'événements et d'attendre que la coroutine termine son exécution pour obtenir le résultat. 🎜🎜Les fonctions suivantes peuvent obtenir cet effet : 🎜<ul class=" list-paddingleft-2"><li>🎜<code>asyncio.run🎜
  • 🎜asyncio .run_coroutine_threadsafe🎜
  • 🎜loop.run_until_complete🎜
  • 🎜create_task🎜
  • 🎜 🎜Suivant , nous expliquerons ces méthodes une par une et donnerons des exemples. 🎜

    asyncio.run

    🎜Cette méthode est la plus simple à utiliser. Voyons d'abord comment l'utiliser, puis parlons des scénarios qui ne peuvent pas être utilisés directement asyncio.run🎜.
    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())
    Copier après la connexion
    Copier après la connexion
    🎜Directement run est effectué, puis acceptez la valeur de retour. 🎜🎜Mais il faut noter que asyncio.run ouvrira une nouvelle boucle d'événements à chaque fois qu'il sera appelé, et fermera automatiquement la boucle d'événements à la fin. 🎜🎜Il n'y a qu'une seule boucle d'événements dans un thread, donc si le thread actuel a déjà une boucle d'événements existante, vous ne devez pas utiliser asyncio.run, sinon il lancera L'exception suivante se produit : 🎜
    🎜RuntimeError : asyncio.run() ne peut pas être appelé à partir d'une boucle d'événement en cours d'exécution🎜
    🎜Par conséquent, asyncio.run est utilisé lors de l'ouverture d'un nouvel événement. utilisation en boucle. 🎜

    asyncio.run_coroutine_threadsafe

    🎜Documentation : https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe🎜
    🎜Vers l'événement spécifié loop Soumettez une coroutine. (Thread-safe)
    Renvoie un concurrent.futures.Future pour attendre les résultats d'autres threads du système d'exploitation. 🎜
    🎜En d'autres termes, lancez la coroutine dans la boucle d'événements dans d'autres threads pour l'exécuter. 🎜🎜Il convient de noter que la "boucle d'événements" ici doit être la boucle d'événements dans d'autres threads, et non la boucle d'événements du thread actuel. 🎜🎜Le résultat renvoyé est un objet futur. Si vous avez besoin d'obtenir le résultat de l'exécution de la coroutine, vous pouvez utiliser future.result() pour l'obtenir. Pour plus d'informations sur les objets futurs, voir https. ://docs .python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future🎜

    下方给了一个例子,一共有两个线程:thread_with_loopanother_thread,分别用于启动事件循环和调用 run_coroutine_threadsafe

    import asyncio
    import threading
    import time
    
    loop = None
    
    
    def get_loop():
        global loop
        if loop is None:
            loop = asyncio.new_event_loop()
        return loop
    
    
    def another_thread():
        async def coro_func():
            return 1
    
        loop = get_loop()
        # 将协程提交到另一个线程的事件循环中执行
        future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
        # 等待协程执行结果
        print(future.result())
        # 停止事件循环
        loop.call_soon_threadsafe(loop.stop)
    
    
    def thread_with_loop():
        loop = get_loop()
        # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用
        loop.run_forever()
        loop.close()
    
    
    # 启动一个线程,线程内部启动了一个事件循环
    threading.Thread(target=thread_with_loop).start()
    time.sleep(1)
    # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行
    t = threading.Thread(target=another_thread)
    t.start()
    t.join()
    Copier après la connexion
    Copier après la connexion

    loop.run_until_complete

    文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete

    运行直到 future ( Future 的实例 ) 被完成。

    这个方法和 asyncio.run 类似。

    具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。

    run_until_complete 属于 loop 对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:

    RuntimeError: This event loop is already running

    例子:

    loop = asyncio.new_event_loop()
    loop.run_until_complete(do_async_work())
    Copier après la connexion
    Copier après la connexion

    create_task

    文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks

    再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。

    其实一共就是满足两个条件:

    • 任务;

    • 事件循环。

    我们使用 async def func 定义的函数叫做协程函数func() 这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。

    所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await 这些语句时没有涉及到任务这个概念呢?

    这是因为 await 语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务

    所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。

    那将协程封装成的任务的方法有哪些呢?

    • asyncio.create_task

    • asyncio.ensure_future

    • loop.create_task

    看着有好几个的,没关系,我们只关心 loop.create_task,因为其他方法最终都是调用 loop.create_task

    使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。

    async def do_work():
        return 222
    
    task = loop.create_task(do_work())
    Copier après la connexion

    do_work 会被异步执行,那么 do_work 的结果怎么获取呢,task.result() 可以吗?

    分情况:

    • 如果是在一个协程函数内使用 await task.result(),这是可以的;

    • 如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。

    asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。

    我这里给个基于 concurrent.futures.Future 获取结果的例子,如下:

    import asyncio
    from asyncio import Task
    from concurrent.futures import Future
    
    from fastapi import FastAPI
    
    app = FastAPI()
    loop = asyncio.get_event_loop()
    
    
    async def do_work1():
        return 222
    
    
    @app.get("/")
    def root():
        # 新建一个 future 对象,用于接受结果值
        future = Future()
    
        # 提交任务至事件循环
        task = loop.create_task(do_work1())
    
        # 回调函数
        def done_callback(task: Task):
            # 设置结果
            future.set_result(task.result())
    
        # 为这个任务添加回调函数
        task.add_done_callback(done_callback)
    
        # future.result 会被阻塞,直到有结果返回为止
        return future.result()  # 222
    Copier après la connexion

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

    Outils d'IA chauds

    Undresser.AI Undress

    Undresser.AI Undress

    Application basée sur l'IA pour créer des photos de nu réalistes

    AI Clothes Remover

    AI Clothes Remover

    Outil d'IA en ligne pour supprimer les vêtements des photos.

    Undress AI Tool

    Undress AI Tool

    Images de déshabillage gratuites

    Clothoff.io

    Clothoff.io

    Dissolvant de vêtements AI

    AI Hentai Generator

    AI Hentai Generator

    Générez AI Hentai gratuitement.

    Article chaud

    R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
    1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌
    R.E.P.O. Meilleurs paramètres graphiques
    1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌
    Will R.E.P.O. Vous avez un jeu croisé?
    1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌

    Outils chauds

    Bloc-notes++7.3.1

    Bloc-notes++7.3.1

    Éditeur de code facile à utiliser et gratuit

    SublimeText3 version chinoise

    SublimeText3 version chinoise

    Version chinoise, très simple à utiliser

    Envoyer Studio 13.0.1

    Envoyer Studio 13.0.1

    Puissant environnement de développement intégré PHP

    Dreamweaver CS6

    Dreamweaver CS6

    Outils de développement Web visuel

    SublimeText3 version Mac

    SublimeText3 version Mac

    Logiciel d'édition de code au niveau de Dieu (SublimeText3)

    PHP et Python: exemples de code et comparaison PHP et Python: exemples de code et comparaison Apr 15, 2025 am 12:07 AM

    PHP et Python ont leurs propres avantages et inconvénients, et le choix dépend des besoins du projet et des préférences personnelles. 1.Php convient au développement rapide et à la maintenance des applications Web à grande échelle. 2. Python domine le domaine de la science des données et de l'apprentissage automatique.

    Comment est la prise en charge du GPU pour Pytorch sur Centos Comment est la prise en charge du GPU pour Pytorch sur Centos Apr 14, 2025 pm 06:48 PM

    Activer l'accélération du GPU Pytorch sur le système CentOS nécessite l'installation de versions CUDA, CUDNN et GPU de Pytorch. Les étapes suivantes vous guideront tout au long du processus: CUDA et CUDNN Installation détermineront la compatibilité de la version CUDA: utilisez la commande NVIDIA-SMI pour afficher la version CUDA prise en charge par votre carte graphique NVIDIA. Par exemple, votre carte graphique MX450 peut prendre en charge CUDA11.1 ou plus. Téléchargez et installez Cudatoolkit: visitez le site officiel de Nvidiacudatoolkit et téléchargez et installez la version correspondante selon la version CUDA la plus élevée prise en charge par votre carte graphique. Installez la bibliothèque CUDNN:

    Explication détaillée du principe docker Explication détaillée du principe docker Apr 14, 2025 pm 11:57 PM

    Docker utilise les fonctionnalités du noyau Linux pour fournir un environnement de fonctionnement d'application efficace et isolé. Son principe de travail est le suivant: 1. Le miroir est utilisé comme modèle en lecture seule, qui contient tout ce dont vous avez besoin pour exécuter l'application; 2. Le Système de fichiers Union (UnionFS) empile plusieurs systèmes de fichiers, ne stockant que les différences, l'économie d'espace et l'accélération; 3. Le démon gère les miroirs et les conteneurs, et le client les utilise pour l'interaction; 4. Les espaces de noms et les CGROUP implémentent l'isolement des conteneurs et les limitations de ressources; 5. Modes de réseau multiples prennent en charge l'interconnexion du conteneur. Ce n'est qu'en comprenant ces concepts principaux que vous pouvez mieux utiliser Docker.

    Python vs JavaScript: communauté, bibliothèques et ressources Python vs JavaScript: communauté, bibliothèques et ressources Apr 15, 2025 am 12:16 AM

    Python et JavaScript ont leurs propres avantages et inconvénients en termes de communauté, de bibliothèques et de ressources. 1) La communauté Python est amicale et adaptée aux débutants, mais les ressources de développement frontal ne sont pas aussi riches que JavaScript. 2) Python est puissant dans les bibliothèques de science des données et d'apprentissage automatique, tandis que JavaScript est meilleur dans les bibliothèques et les cadres de développement frontaux. 3) Les deux ont des ressources d'apprentissage riches, mais Python convient pour commencer par des documents officiels, tandis que JavaScript est meilleur avec MDNWEBDOCS. Le choix doit être basé sur les besoins du projet et les intérêts personnels.

    Miniopen Centos Compatibilité Miniopen Centos Compatibilité Apr 14, 2025 pm 05:45 PM

    Minio Object Storage: Déploiement haute performance dans le système Centos System Minio est un système de stockage d'objets distribué haute performance développé sur la base du langage Go, compatible avec Amazons3. Il prend en charge une variété de langages clients, notamment Java, Python, JavaScript et GO. Cet article introduira brièvement l'installation et la compatibilité de Minio sur les systèmes CentOS. Compatibilité de la version CentOS Minio a été vérifiée sur plusieurs versions CentOS, y compris, mais sans s'y limiter: CentOS7.9: fournit un guide d'installation complet couvrant la configuration du cluster, la préparation de l'environnement, les paramètres de fichiers de configuration, le partitionnement du disque et la mini

    Comment faire fonctionner la formation distribuée de Pytorch sur CentOS Comment faire fonctionner la formation distribuée de Pytorch sur CentOS Apr 14, 2025 pm 06:36 PM

    La formation distribuée par Pytorch sur le système CentOS nécessite les étapes suivantes: Installation de Pytorch: La prémisse est que Python et PIP sont installés dans le système CentOS. Selon votre version CUDA, obtenez la commande d'installation appropriée sur le site officiel de Pytorch. Pour la formation du processeur uniquement, vous pouvez utiliser la commande suivante: pipinstalltorchtorchVisionTorChaudio Si vous avez besoin d'une prise en charge du GPU, assurez-vous que la version correspondante de CUDA et CUDNN est installée et utilise la version Pytorch correspondante pour l'installation. Configuration de l'environnement distribué: la formation distribuée nécessite généralement plusieurs machines ou des GPU multiples uniques. Lieu

    Comment choisir la version Pytorch sur Centos Comment choisir la version Pytorch sur Centos Apr 14, 2025 pm 06:51 PM

    Lors de l'installation de Pytorch sur le système CentOS, vous devez sélectionner soigneusement la version appropriée et considérer les facteurs clés suivants: 1. Compatibilité de l'environnement du système: Système d'exploitation: Il est recommandé d'utiliser CentOS7 ou plus. CUDA et CUDNN: La version Pytorch et la version CUDA sont étroitement liées. Par exemple, Pytorch1.9.0 nécessite CUDA11.1, tandis que Pytorch2.0.1 nécessite CUDA11.3. La version CUDNN doit également correspondre à la version CUDA. Avant de sélectionner la version Pytorch, assurez-vous de confirmer que des versions compatibles CUDA et CUDNN ont été installées. Version Python: branche officielle de Pytorch

    Comment installer nginx dans Centos Comment installer nginx dans Centos Apr 14, 2025 pm 08:06 PM

    CENTOS L'installation de Nginx nécessite de suivre les étapes suivantes: Installation de dépendances telles que les outils de développement, le devet PCRE et l'OpenSSL. Téléchargez le package de code source Nginx, dézippez-le et compilez-le et installez-le, et spécifiez le chemin d'installation AS / USR / LOCAL / NGINX. Créez des utilisateurs et des groupes d'utilisateurs de Nginx et définissez les autorisations. Modifiez le fichier de configuration nginx.conf et configurez le port d'écoute et le nom de domaine / adresse IP. Démarrez le service Nginx. Les erreurs communes doivent être prêtées à prêter attention, telles que les problèmes de dépendance, les conflits de port et les erreurs de fichiers de configuration. L'optimisation des performances doit être ajustée en fonction de la situation spécifique, comme l'activation du cache et l'ajustement du nombre de processus de travail.

    See all articles