如何从任务生成器创建异步任务执行机制

本文介绍了如何利用Python的asyncio库,结合任务生成器,实现异步任务的执行。重点在于避免使用await直接等待任务完成,而是通过create_task创建任务并将其添加到事件循环中,并通过asyncio.sleep(0)或TaskGroup等机制,确保事件循环能够调度其他任务,从而实现真正的异步并发执行。文章提供了详细的代码示例和解释,帮助读者理解并掌握异步任务处理的关键技巧。

在异步编程中,有时我们需要从一个任务生成器中不断产生新的任务,并将这些任务添加到事件循环中异步执行,而不需要显式地使用await等待每个任务完成。这种模式在处理诸如长轮询服务器接收事件并异步处理的场景中非常有用。

关键在于,虽然我们调用了loop.create_task或tg.create_task创建了任务,但如果不主动让出控制权,事件循环可能无法及时调度这些新创建的任务。这会导致程序看似异步,但实际上仍然是同步执行,无法发挥异步并发的优势。

以下是一个示例,展示了如何从任务生成器创建异步任务执行机制:

import asyncio, random

async def wrapper(word: str):
    print(f"开始处理: {word}")
    await asyncio.sleep(1) # 模拟耗时操作
    print(f"处理完成: {word}")

def generator():
    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
    while True:
        yield random.choice(abc)

async def manager():
    #loop = asyncio.get_event_loop() # TaskGroup 不需要显式获取 loop
    try:
        async with asyncio.TaskGroup() as tg:
            for letter in generator():
                tg.create_task(wrapper(letter)) # 创建异步任务
                await asyncio.sleep(0) # 关键:让出控制权,允许事件循环调度其他任务
    except ExceptionGroup as eg:
        print(f"发生异常: {eg}")

asyncio.run(manager())

代码解释:

  1. wrapper(word: str): 这是一个协程函数,模拟一个耗时任务,接收一个字符串 word 作为输入,打印开始和结束信息,并使用 asyncio.sleep(1) 模拟耗时操作。
  2. generator(): 这是一个生成器函数,无限循环地生成随机字母。
  3. manager(): 这是管理协程,负责从生成器获取字母,并创建相应的异步任务。
    • async with asyncio.TaskGroup() as tg:: 使用TaskGroup来管理一组相关的任务。TaskGroup是Python 3.11引入的新特性,可以更方便地管理和控制一组异步任务。如果任何一个任务抛出异常,整个TaskGroup会被取消。
    • tg.create_task(wrapper(letter)): 使用TaskGroup的create_task方法来创建异步任务,并将 wrapper(letter) 协程添加到事件循环中。
    • await asyncio.sleep(0): 这是关键的一步。asyncio.sleep(0) 会立即将控制权交还给事件循环,允许事件循环调度其他就绪的任务。如果不加这句,manager() 协程会一直循环创建任务,而不会给其他任务执行的机会,导致程序看似异步,实际上是同步执行。

注意事项:

总结:

通过结合任务生成器和 asyncio.create_task 或 TaskGroup.create_task,可以方便地创建和管理异步任务。关键在于,要确保在循环中定期让出控制权,例如使用 await asyncio.sleep(0),以便事件循环能够及时调度其他任务,从而实现真正的异步并发执行。使用TaskGroup可以更加方便地管理和控制一组相关的异步任务,并自动处理任务中的异常。

本文转载于:互联网 如有侵犯,请联系zhengruancom@outlook.com删除。
免责声明:正软商城发布此文仅为传递信息,不代表正软商城认同其观点或证实其描述。