Python で同期関数と非同期関数を混合する方法
コルーチン関数で同期関数を呼び出す
コルーチン関数で同期関数を直接呼び出すとイベント ループがブロックされ、そのため、プログラム全体。まず例を見てみましょう:
次は、非同期 Web フレームワーク FastAPI を使用して記述された例です。FastAPI は比較的高速ですが、誤った操作を行うと非常に遅くなります。
import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): time.sleep(10) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
上記では 2 つのインターフェイスを作成しました。root
インターフェイス関数には 10 秒かかると仮定します。この 10 秒の間に、health
インターフェイスがアクセスされます。何が起こるでしょうか?
root
インターフェイス (左) にアクセスし、すぐに health
インターフェイス (右) にアクセスし、health
インターフェイスにアクセスします。 root
インターフェイスが戻るまでブロックされますが、health
インターフェイスは正常に応答します。
time.sleep
は、イベント ループ全体をブロックする「同期」関数です。
どうすれば解決できますか?前述の処理方法について考えてみましょう。関数がメイン スレッドをブロックしている場合は、別のスレッドを開いてブロックしている関数を単独で実行させます。したがって、同じ原則がここにも適用され、スレッドを開いて、ファイルの読み取りなどのブロック操作を個別に実行します。
loop.run_in_executor
メソッドは、同期関数を非同期ノンブロッキング モードに変換して処理します。具体的には、 loop.run_in_executor()
同期関数を a thread または process として作成し、その中で関数を実行することで、イベント ループのブロックを回避できます。
正式な例: スレッドまたはプロセス プールでコードを実行します。
次に、loop.run_in_executor
を使用して、上記の例を次のように書き換えます:
import asyncio import time from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): loop = asyncio.get_event_loop() def do_blocking_work(): time.sleep(10) print("Done blocking work!!") await loop.run_in_executor(None, do_blocking_work) return {"message": "Hello World"} @app.get("/health") async def health(): return {"status": "ok"}
結果は次のようになります:
root
インターフェイスがブロックされていても、health
は相互に影響を与えることなく通常どおりアクセスできます。
同期関数内で非同期関数を呼び出すコルーチンは「イベント ループ」内でのみ実行でき、同時に実行できるコルーチンは 1 つだけです。 したがって、同期関数内で非同期関数を呼び出す本質は、コルーチンをイベント ループに「スロー」し、コルーチンが実行を終了して結果を取得するのを待つことです。 次の関数でこの効果を実現できます:注: これはすべてデモンストレーション用です。実際に FastAPI を使用して開発する場合は、
async def root
をdef root ## に直接置き換えることができます。 # つまり、同期インターフェイス関数に置き換えます。FastAPI は、この同期インターフェイス関数を処理するためのスレッドを内部で自動的に作成します。一般に、FastAPI は、メイン スレッド (またはメイン スレッド内のイベント ループ) のブロックを回避するために、内部的にスレッドに依存して同期関数を処理します。
asyncio.run
asyncio.run_coroutine_threadsafe
loop.run_until_complete
create_task
##次に、これらの方法を 1 つずつ説明し、例を示します。
asyncio.run
import asyncio async def do_work(): return 1 def main(): result = asyncio.run(do_work()) print(result) # 1 if __name__ == "__main__": main()
Just
run が完了し、戻り値を受け入れます。 ただし、
asyncio.run
スレッドにはイベント ループ
asyncio.run を使用しないでください。次の例外がスローされます: RuntimeError: asyncio.run() は実行中のイベント ループから呼び出すことはできません
新しいイベント ループを開くときに使用されます。したがって、
asyncio.run
asyncio.run_coroutine_threadsafe
指定されたイベント ループにコルーチンを送信します。 (スレッドセーフ)
concurrent.futures.Futureを返し、他の OS スレッドからの結果を待ちます。。
つまり、
コルーチンを他のスレッドのイベント ループにスローして実行します
ここでの「イベント ループ」は、現在のスレッドのイベント ループではなく、他のスレッドのイベント ループである必要があることに注意してください。 返された結果は、future オブジェクトです。コルーチンの実行結果を取得する必要がある場合は、
future.result()を使用して取得できます。future オブジェクトの詳細については、 、https://docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.Future
を参照してください。下方给了一个例子,一共有两个线程:thread_with_loop
和 another_thread
,分别用于启动事件循环和调用 run_coroutine_threadsafe
import asyncio import threading import time loop = None def get_loop(): global loop if loop is None: loop = asyncio.new_event_loop() return loop def another_thread(): async def coro_func(): return 1 loop = get_loop() # 将协程提交到另一个线程的事件循环中执行 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 等待协程执行结果 print(future.result()) # 停止事件循环 loop.call_soon_threadsafe(loop.stop) def thread_with_loop(): loop = get_loop() # 启动事件循环,确保事件循环不会退出,直到 loop.stop() 被调用 loop.run_forever() loop.close() # 启动一个线程,线程内部启动了一个事件循环 threading.Thread(target=thread_with_loop).start() time.sleep(1) # 在主线程中启动一个协程, 并将协程提交到另一个线程的事件循环中执行 t = threading.Thread(target=another_thread) t.start() t.join()
loop.run_until_complete
文档: https://docs.python.org/zh-cn/3.10/library/asyncio-eventloop.html#asyncio.loop.run_until_complete
运行直到 future (
Future
的实例 ) 被完成。
这个方法和
asyncio.run
类似。
具体就是传入一个协程对象或者任务,然后可以直接拿到协程的返回值。
run_until_complete
属于 loop
对象的方法,所以这个方法的使用前提是有一个事件循环,注意这个事件循环必须是非运行状态,如果是运行中就会抛出如下异常:
RuntimeError: This event loop is already running
例子:
loop = asyncio.new_event_loop() loop.run_until_complete(do_async_work())
create_task
文档: https://docs.python.org/zh-cn/3/library/asyncio-task.html#creating-tasks
再次准确一点:要运行一个协程函数的本质是将携带协程函数的任务提交至事件循环中,由事件循环发现、调度并执行。
其实一共就是满足两个条件:
任务;
事件循环。
我们使用 async def func
定义的函数叫做协程函数,func()
这样调用之后返回的结果是协程对象,到这一步协程函数内的代码都没有被执行,直到协程对象被包装成了任务,事件循环才会“正眼看它们”。
所以事件循环调度运行的基本单元就是任务,那为什么我们在使用 async/await
这些语句时没有涉及到任务这个概念呢?
这是因为 await
语法糖在内部将协程对象封装成了任务,再次强调事件循环只认识任务。
所以,想要运行一个协程对象,其实就是将协程对象封装成一个任务,至于事件循环是如何发现、调度和执行的,这个我们不用关心。
那将协程封装成的任务的方法有哪些呢?
asyncio.create_task
asyncio.ensure_future
loop.create_task
看着有好几个的,没关系,我们只关心 loop.create_task
,因为其他方法最终都是调用 loop.create_task
。
使用起来也是很简单的,将协程对象传入,返回值是一个任务对象。
async def do_work(): return 222 task = loop.create_task(do_work())
do_work
会被异步执行,那么 do_work 的结果怎么获取呢,task.result()
可以吗?
分情况:
如果是在一个协程函数内使用
await task.result()
,这是可以的;如果是在普通函数内则不行。你不可能立即获得协程函数的返回值,因为协程函数还没有被执行呢。
asyncio.Task 运行使用 add_done_callback 添加完成时的回调函数,所以我们可以「曲线救国」,使用回调函数将结果添加到队列、Future 等等。
我这里给个基于 concurrent.futures.Future
获取结果的例子,如下:
import asyncio from asyncio import Task from concurrent.futures import Future from fastapi import FastAPI app = FastAPI() loop = asyncio.get_event_loop() async def do_work1(): return 222 @app.get("/") def root(): # 新建一个 future 对象,用于接受结果值 future = Future() # 提交任务至事件循环 task = loop.create_task(do_work1()) # 回调函数 def done_callback(task: Task): # 设置结果 future.set_result(task.result()) # 为这个任务添加回调函数 task.add_done_callback(done_callback) # future.result 会被阻塞,直到有结果返回为止 return future.result() # 222
以上がPython で同期関数と非同期関数を混合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









PHPとPythonには独自の利点と短所があり、選択はプロジェクトのニーズと個人的な好みに依存します。 1.PHPは、大規模なWebアプリケーションの迅速な開発とメンテナンスに適しています。 2。Pythonは、データサイエンスと機械学習の分野を支配しています。

Pytorch GPUアクセラレーションを有効にすることで、CentOSシステムでは、PytorchのCUDA、CUDNN、およびGPUバージョンのインストールが必要です。次の手順では、プロセスをガイドします。CUDAおよびCUDNNのインストールでは、CUDAバージョンの互換性が決定されます。NVIDIA-SMIコマンドを使用して、NVIDIAグラフィックスカードでサポートされているCUDAバージョンを表示します。たとえば、MX450グラフィックカードはCUDA11.1以上をサポートする場合があります。 cudatoolkitのダウンロードとインストール:nvidiacudatoolkitの公式Webサイトにアクセスし、グラフィックカードでサポートされている最高のCUDAバージョンに従って、対応するバージョンをダウンロードしてインストールします。 cudnnライブラリをインストールする:

DockerはLinuxカーネル機能を使用して、効率的で孤立したアプリケーションランニング環境を提供します。その作業原則は次のとおりです。1。ミラーは、アプリケーションを実行するために必要なすべてを含む読み取り専用テンプレートとして使用されます。 2。ユニオンファイルシステム(UnionFS)は、違いを保存するだけで、スペースを節約し、高速化する複数のファイルシステムをスタックします。 3.デーモンはミラーとコンテナを管理し、クライアントはそれらをインタラクションに使用します。 4。名前空間とcgroupsは、コンテナの分離とリソースの制限を実装します。 5.複数のネットワークモードは、コンテナの相互接続をサポートします。これらのコア概念を理解することによってのみ、Dockerをよりよく利用できます。

PythonとJavaScriptには、コミュニティ、ライブラリ、リソースの観点から、独自の利点と短所があります。 1)Pythonコミュニティはフレンドリーで初心者に適していますが、フロントエンドの開発リソースはJavaScriptほど豊富ではありません。 2)Pythonはデータサイエンスおよび機械学習ライブラリで強力ですが、JavaScriptはフロントエンド開発ライブラリとフレームワークで優れています。 3)どちらも豊富な学習リソースを持っていますが、Pythonは公式文書から始めるのに適していますが、JavaScriptはMDNWebDocsにより優れています。選択は、プロジェクトのニーズと個人的な関心に基づいている必要があります。

MINIOオブジェクトストレージ:CENTOSシステムの下での高性能展開Minioは、Amazons3と互換性のあるGO言語に基づいて開発された高性能の分散オブジェクトストレージシステムです。 Java、Python、JavaScript、Goなど、さまざまなクライアント言語をサポートしています。この記事では、CentosシステムへのMinioのインストールと互換性を簡単に紹介します。 Centosバージョンの互換性Minioは、Centos7.9を含むがこれらに限定されない複数のCentosバージョンで検証されています。

Pytorchの分散トレーニングでは、Centosシステムでトレーニングには次の手順が必要です。Pytorchのインストール:PythonとPipがCentosシステムにインストールされていることです。 CUDAバージョンに応じて、Pytorchの公式Webサイトから適切なインストールコマンドを入手してください。 CPUのみのトレーニングには、次のコマンドを使用できます。PipinstalltorchtorchtorchvisionTorchaudioGPUサポートが必要な場合は、CUDAとCUDNNの対応するバージョンがインストールされ、インストールに対応するPytorchバージョンを使用してください。分散環境構成:分散トレーニングには、通常、複数のマシンまたは単一マシンの複数GPUが必要です。場所

PytorchをCentosシステムにインストールする場合、適切なバージョンを慎重に選択し、次の重要な要因を検討する必要があります。1。システム環境互換性:オペレーティングシステム:Centos7以上を使用することをお勧めします。 Cuda and Cudnn:PytorchバージョンとCudaバージョンは密接に関連しています。たとえば、pytorch1.9.0にはcuda11.1が必要ですが、pytorch2.0.1にはcuda11.3が必要です。 CUDNNバージョンは、CUDAバージョンとも一致する必要があります。 Pytorchバージョンを選択する前に、互換性のあるCUDAおよびCUDNNバージョンがインストールされていることを確認してください。 Pythonバージョン:Pytorch公式支店

NGINXのインストールをインストールするには、次の手順に従う必要があります。開発ツール、PCRE-Devel、OpenSSL-Develなどの依存関係のインストール。 nginxソースコードパッケージをダウンロードし、それを解凍してコンパイルしてインストールし、/usr/local/nginxとしてインストールパスを指定します。 nginxユーザーとユーザーグループを作成し、アクセス許可を設定します。構成ファイルnginx.confを変更し、リスニングポートとドメイン名/IPアドレスを構成します。 nginxサービスを開始します。依存関係の問題、ポート競合、構成ファイルエラーなど、一般的なエラーに注意する必要があります。パフォーマンスの最適化は、キャッシュをオンにしたり、ワーカープロセスの数を調整するなど、特定の状況に応じて調整する必要があります。
