使用 Asyncio 建立和管理任務

王林
發布: 2024-08-15 06:37:08
原創
577 人瀏覽過

Creating and Managing Tasks with Asyncio

Asyncio 讓開發人員輕鬆地用 Python 編寫非同步程式。該模組還提供了多種非同步任務的方法,並且由於執行方法多種多樣,因此可能會讓人困惑於使用哪一種。

在本文中,我們將討論使用 asyncio 建立和管理任務的多種方法。

什麼是異步任務?

在 asyncio 中,任務 是一個包裝協程並安排其在事件循環內運行的物件。簡而言之,任務是一種與其他任務同時運行協程的方式。建立任務後,事件循環將運行它,並根據需要暫停和恢復它以允許其他任務運行。

建立和管理 Asyncio 任務的方法

現在,我們可以討論建立和管理任務的方法。首先,要使用 asyncio 在 Python 中建立任務,請使用 asyncio.create_task 方法,該方法採用以下參數:

  • coro(必填):要調度的協程物件。這是您想要非同步運行的函數。

  • name(可選):可用於偵錯或日誌記錄目的的任務名稱。您可以為此參數指派一個字串。

    • 您也可以稍後使用 Task.set_name(name) 和 Task.get_name() 設定或取得名稱。
  • context(可選):在Python 3.11中引入,用於設定任務的上下文變量,從而啟用任務本地儲存。它類似於線程本地存儲,但用於非同步任務。

    • 除非您正在處理需要上下文管理的高階場景,否則此參數並不常用。

這是asyncio.create_task的用法範例:

import asyncio

# Define a coroutine
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O-bound operation
    print(f"Hello, {name}!")

async def main():
    # Create tasks
    task1 = asyncio.create_task(greet("Alice"), name="GreetingAlice")
    task2 = asyncio.create_task(greet("Bob"), name="GreetingBob")

    # Check task names
    print(f"Task 1 name: {task1.get_name()}")
    print(f"Task 2 name: {task2.get_name()}")

    # Wait for both tasks to complete
    await task1
    await task2

# Run the main function
asyncio.run(main())
登入後複製

建立任務時,可以執行許多方法,例如:

  • .cancel():取消任務。

  • .add_done_callback(cb):新增任務完成時執行的回呼函數。

  • .done():檢查任務是否完成。

  • .result():任務完成後擷取結果。

現在我們了解如何建立任務,讓我們看看如何處理等待一個任務或多個任務。

等待任務完成

在本節中,我們將討論如何等待一個或多個任務的任務完成。非同步程式設計基於這樣一個事實:如果正在運行非同步任務,我們可以繼續執行程式。有時您可能想要更好地控制流程,並希望確保在安全地繼續執行程序之前獲得可以使用的結果。

要等待單一任務完成,可以使用 asyncio.wait_for。它需要兩個參數:

  • awaitable(必備):這是您想要等待的協程、任務或未來。它可以是任何可以等待的對象,例如協程函數呼叫、asyncio.Task 或 asyncio.Future。

  • 逾時(可選):指定等待 aw 完成的最大秒數。如果達到超時並且等待尚未完成,asyncio.wait_for 會引發 TimeoutError。如果超時設定為 None,則函數將無限期地等待等待完成。

以下是使用此方法的範例:

import asyncio

async def slow_task():
    print("Task started...")
    await asyncio.sleep(5)  # Simulating a long-running task
    print("Task finished!")
    return "Completed"

async def main():
    try:
        # Wait for slow_task to finish within 2 seconds
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("The task took too long and was canceled!")

asyncio.run(main())
登入後複製

在上面的程式碼中,slow_task() 是一個協程,透過休眠 5 秒來模擬長時間運行的任務。 asyncio.wait_for(slow_task(), timeout=2) 行等待任務完成,但將等待時間限制為 2 秒,導致逾時,因為任務需要更長的時間。當超過逾時時,會引發 TimeoutError,任務被取消,並透過列印指示任務花費太長時間的訊息來處理異常。

我們還可以等待多個或一組任務完成。這可以使用 asyncio.wait、asyncio.gather 或 asyncio.as_completed 來實現。讓我們探索一下每種方法。

非同步等待

asyncio.wait 方法等待一組任務並傳回兩組:一組用於已完成的任務,一組用於待處理的任務。它需要以下參數:

  • aws(必需,可迭代):您想要等待的協程物件、任務或 future 的集合。

  • 超時(float 或 None,可選):等待的最大秒數。如果未提供,則會無限期等待。

  • return_when(常數,可選):指定 asyncio.wait 何時返回。選項包括:

    • asyncio.ALL_COMPLETED (default): Returns when all tasks are complete.
    • asyncio.FIRST_COMPLETED: Returns when the first task is completed.
    • asyncio.FIRST_EXCEPTION: Returns when the first task raises an exception.

Let's see how it is used in an example.

import asyncio
import random

async def task():
    await asyncio.sleep(random.uniform(1, 3))

async def main():
    tasks = [asyncio.create_task(task()) for _ in range(3)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"Done tasks: {len(done)}, Pending tasks: {len(pending)}")

asyncio.run(main())
登入後複製

In the code above, asyncio.wait waits for a group of tasks and returns two sets: one with completed tasks and another with those still pending. You can control when it returns, such as after the first task is completed or after all tasks are done. In the example, asyncio.wait returns when the first task is completed, leaving the rest in the pending set.

asyncio.gather

The asyncio.gather method runs multiple awaitable objects concurrently and returns a list of their results, optionally handling exceptions. Let's see the arguments it takes.

  • *aws (required, multiple awaitables): A variable number of awaitable objects (like coroutines, tasks, or futures) to run concurrently.

  • return_exceptions (bool, optional): If True, exceptions in the tasks will be returned as part of the results list instead of being raised.

Let's see how it can be used in an example.

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    results = await asyncio.gather(task(1), task(2), task(3))
    print(results)

asyncio.run(main())
登入後複製

In the code above, asyncio.gather runs multiple awaitable objects concurrently and returns a list of their results in the order they were passed in. It allows you to handle exceptions gracefully if return_exceptions is set to True. In the example, three tasks are run simultaneously, and their results are returned in a list once all tasks are complete.

asyncio.as_completed

The asyncio.as_completed method is used to return an iterator that yields tasks as they are completed, allowing results to be processed immediately. It takes the following arguments:

  • aws (iterable of awaitables): A collection of coroutine objects, tasks, or futures.

  • timeout (float or None, optional): The maximum number of seconds to wait for tasks to complete. If not provided, it waits indefinitely.

Example

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    tasks = [task(i) for i in range(3)]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())
登入後複製

In the example above, asyncio.as_completed returns an iterator that yields results as each task completes, allowing you to process them immediately. This is useful when you want to handle results as soon as they're available, rather than waiting for all tasks to finish. In the example, the tasks are run simultaneously, and their results are printed as each one finishes, in the order they complete.

So to make a summary, you use:

  • asyncio.wait: when you need to handle multiple tasks and want to track which tasks are completed and which are still pending. It's useful when you care about the status of each task separately.

  • asyncio.gather: when you want to run multiple tasks concurrently and need the results in a list, especially when the order of results matters or you need to handle exceptions gracefully.

  • asyncio.as_completed: when you want to process results as soon as each task finishes, rather than waiting for all tasks to complete. It’s useful for handling results in the order they become available.

However, these methods don't take atomic task management with built-in error handling. In the next section, we will see about asyncio.TaskGroup and how to use it to manage a group of tasks.

asyncio.TaskGroup

asyncio.TaskGroup is a context manager introduced in Python 3.11 that simplifies managing multiple tasks as a group. It ensures that if any task within the group fails, all other tasks are canceled, providing a way to handle complex task management with robust error handling. The class has one method called created_task used to create and add tasks to the task group. You pass a coroutine to this method, and it returns an asyncio.Task object that is managed by the group.

Here is an example of how it is used:

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "Task 1 done"

async def task2():
    await asyncio.sleep(2)
    return "Task 2 done"

async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("An error occurred")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(task1())
            task2 = tg.create_task(task2())
            error_task = tg.create_task(task_with_error())
    except Exception as e:
        print(f"Error: {e}")

    # Print results from completed tasks
    print("Task 1 result:", task1.result())
    print("Task 2 result:", task2.result())

asyncio.run(main())
登入後複製

asyncio.TaskGroup manages multiple tasks and ensures that if any task fails, all other tasks in the group are canceled. In the example, a task with an error causes the entire group to be canceled, and only the results of completed tasks are printed.

Usage for this can be in web scraping. You can use asyncio.TaskGroup to handle multiple concurrent API requests and ensure that if any request fails, all other requests are canceled to avoid incomplete data.

We are at the end of the article and we have learned the multiple methods asyncio provides to create and manage tasks. Here is a summary of the methods:

  • asyncio.wait_for: Wait for a task with a timeout.

  • asyncio.wait: Wait for multiple tasks with flexible completion conditions.

  • asyncio.gather: Aggregate multiple tasks into a single awaitable.

  • asyncio.as_completed:在任務完成時處理任務。

  • asyncio.TaskGroup:管理一組任務,並在失敗時自動取消。

結論

非同步程式設計可以改變您在 Python 中處理並發任務的方式,讓您的程式碼更有效率、反應更快。在本文中,我們瀏覽了 asyncio 提供的各種方法來建立和管理任務,從簡單的超時到複雜的任務組。了解何時以及如何使用每種方法(asyncio.wait_for、asyncio.wait、asyncio.gather、asyncio.as_completed 和asyncio.TaskGroup)將幫助您充分利用非同步程式設計的潛力,使您的應用程式更加健壯和可擴展。

要更深入地了解非同步程式設計和更多實際範例,請在此處瀏覽我們的詳細指南。

如果您喜歡這篇文章,請考慮訂閱我的時事通訊,這樣您就不會錯過未來的更新。

編碼愉快!

以上是使用 Asyncio 建立和管理任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板