0%

收集多个可等待对象的结果:asyncio.gather()

asyncio.gather 的核心功能是并发运行多个可等待对象(awaitables),并等待它们全部完成后收集结果

当你需要同时执行多个独立的异步操作(比如并发发出多个网络请求或数据库查询)并一次性获取所有结果时,asyncio.gather 是理想的选择。

asyncio.gather 的主要作用

  1. 并发执行gather 接收一个或多个可等待对象(如协程、任务或 Future)作为参数,并将它们并发地在事件循环中调度执行。 如果传入的是协程,gather 会自动将其包装成 asyncio.Task

  2. 等待完成await asyncio.gather(...) 会阻塞当前任务,直到所有传入的可等待对象都执行完毕。

  3. 收集结果:一旦所有任务都成功完成,gather 会返回一个列表,其中包含了每个任务的返回值。 重要的是,返回结果的顺序与你传入可等待对象的顺序完全一致,而与它们的实际完成顺序无关。

如何使用 asyncio.gather

最常见的使用方式是将多个协程调用作为参数传递给 gather

基础代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time

async def fetch_data(source, delay):
"""一个模拟从不同来源获取数据的协程"""
print(f"开始从 {source} 获取数据...")
await asyncio.sleep(delay)
result = f"来自 {source} 的数据"
print(f"完成从 {source} 获取数据")
return result

async def main():
start_time = time.time()
print("同时启动多个任务...")

# 使用 asyncio.gather 并发运行两个协程
results = await asyncio.gather(
fetch_data("API", 2),
fetch_data("数据库", 3)
)

end_time = time.time()

print(f"\n所有任务完成,总耗时: {end_time - start_time:.2f} 秒")
print(f"获取到的结果: {results}")

if __name__ == "__main__":
asyncio.run(main())

运行结果分析

  • 两个 fetch_data 任务会并发执行。
  • 程序不会等待2秒再等待3秒(总共5秒),而是会等待最长的那个任务完成,也就是3秒。
  • 最终的输出结果 results 会是一个列表 ['来自 API 的数据', '来自 数据库的数据'],顺序与调用 gather 时的参数顺序一致。

异常处理

asyncio.gather 在处理异常时有两种模式,由 return_exceptions 参数控制。

默认行为 (return_exceptions=False)

默认情况下,如果 gather 中有任何一个任务引发了异常,这个异常会立即被传播到 await asyncio.gather(...) 的调用处。 这意味着:

  • 你的程序会立即因为这个异常而中断(除非你用 try...except 捕获它)。
  • 其他仍在运行的任务不会被自动取消,它们会继续在后台运行直到完成或事件循环结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def failing_coroutine():
await asyncio.sleep(1)
raise ValueError("任务发生错误!")

async def main_fail():
try:
await asyncio.gather(
fetch_data("API", 2),
failing_coroutine()
)
except ValueError as e:
print(f"\n在 gather 中捕获到异常: {e}")

# 即使捕获了异常,API任务可能仍在后台运行
await asyncio.sleep(2)
print("主程序结束")

# 运行 asyncio.run(main_fail())

运行结果分析

  1. failing_coroutine 在1秒后抛出 ValueError
  2. gather 立即将这个异常传播出来,并被 try...except 块捕获。
  3. fetch_data("API", 2) 任务不会被取消,它会继续运行并在2秒后打印完成信息。

收集异常 (return_exceptions=True)

当你需要确保所有任务都执行完毕,无论它们是否成功,并且想要检查每个任务的结果时,可以将 return_exceptions 设置为 True

在这种模式下:

  • gather 不会传播异常,而是会像对待成功结果一样“收集”它们。
  • 返回的列表中,成功任务的位置是其返回值,而失败任务的位置则是该异常对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def main_return_exceptions():
results = await asyncio.gather(
fetch_data("API", 2),
failing_coroutine(),
return_exceptions=True
)

print("\n使用 return_exceptions=True 的结果:")
for result in results:
if isinstance(result, Exception):
print(f" - 任务失败,异常: {result}")
else:
print(f" - 任务成功,结果: '{result}'")

# 运行 asyncio.run(main_return_exceptions())

运行结果分析
程序会等待所有任务完成(大约2秒),然后 results 列表会包含类似 ['来自 API 的数据', ValueError('任务发生错误!')] 的内容。你可以遍历这个列表来分别处理成功和失败的情况。

asyncio.gather vs asyncio.wait

asyncio 还有一个 asyncio.wait() 函数,它与 gather 类似但用途不同:

  • 返回值gather 直接返回结果列表;wait 返回两组 Task 对象:已完成的(done)和未完成的(pending)。你需要自己从已完成的任务中提取结果或异常。
  • 灵活性wait 更加灵活,可以配置为在第一个任务完成或第一个任务出现异常时就返回,而 gather 总是等待所有任务完成。
  • 易用性:对于“运行一堆任务并获取所有结果”这个常见用例,gather 的接口更简单直接。