0%

保护协程不被取消:asyncio.shield()

asyncio.shield() 用于保护一个协程(awaitable object)免受取消操作的影响。当一个任务(Task)被取消时,asyncio 会在其内部引发一个 CancelledError 异常,这通常会导致任务的执行被中断。然而,在某些情况下,我们希望确保某些关键操作即使在外部请求取消的情况下也能执行完毕,这时 asyncio.shield() 就派上了用场。

asyncio.shield 的作用

asyncio.shield() 的核心作用是保护一个可等待对象(如协程或任务)不被取消。它通过将可等待对象包装在一个特殊的 Future 对象中来实现这一点,这个 Future 对象会“吸收”取消请求。

具体来说:

  • 当一个被 shield() 保护的任务的外部封装(即 shield() 返回的 Future)被取消时,这个取消请求不会传播到内部被保护的任务中。
  • 对于发起取消请求的代码来说,看起来取消操作是成功的,因为等待被保护任务的地方会立即收到 CancelledError 异常。
  • 然而,被保护的内部任务实际上会继续在后台运行,直到它自然完成。

这在执行一些不能被中断的关键操作时非常有用,例如:

  • 资源清理:确保文件句柄、网络连接等资源被正确关闭,即使主任务被取消。
  • 数据完整性:在退出前完成重要的数据写入或状态更新,防止数据损坏。
  • 优雅关闭:在应用程序关闭时,保证一些清理或回滚操作能够顺利完成。

如何使用 asyncio.shield

asyncio.shield() 的语法很简单,它接受一个可等待对象作为参数:

1
asyncio.shield(aw)

如果 aw 是一个协程,它会自动被调度为一个任务来运行。

代码示例

下面通过一个例子来直观地展示 asyncio.shield() 的效果。我们将创建一个长时间运行的任务,并尝试在它完成前取消它,分别在有和没有 shield() 保护的情况下进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
import time

async def critical_operation():
"""一个模拟的、不应被中断的关键操作"""
print("关键操作开始,需要5秒钟完成...")
try:
for i in range(5):
print(f"关键操作正在进行中... {i+1}/5")
await asyncio.sleep(1)
print("关键操作成功完成!")
return "操作结果"
except asyncio.CancelledError:
print("关键操作被取消了(这不应该发生)!")
raise

async def main():
# 创建一个关键操作的任务
task = asyncio.create_task(critical_operation())

# 使用 shield 保护任务
shielded_task = asyncio.shield(task)

# 运行另一个任务,它将在1秒后尝试取消被保护的任务
canceller = asyncio.create_task(cancel_after(shielded_task, 1))

try:
# 等待被保护的任务
# 当 canceller 取消 shielded_task 时,这里会立即抛出 CancelledError
result = await shielded_task
print(f"从受保护的任务中获取结果: {result}")
except asyncio.CancelledError:
print("主协程捕获到 CancelledError,但内部任务应该还在运行。")

# 尽管 shielded_task 已经被“取消”,但原始的 task 应该仍在运行
# 我们可以等待原始任务完成来验证这一点
print("等待原始任务完成...")
final_result = await task # 等待内部任务最终完成
print(f"原始任务最终完成,结果是: '{final_result}'")
print(f"原始任务是否被取消: {task.cancelled()}")


async def cancel_after(task_to_cancel, delay):
"""在一个延迟后取消指定的任务"""
await asyncio.sleep(delay)
print(f"{delay}秒后,尝试取消任务...")
was_cancelled = task_to_cancel.cancel()
print(f"取消请求是否成功发出: {was_cancelled}")

if __name__ == "__main__":
asyncio.run(main())

运行结果分析:

  1. critical_operation 开始执行并打印 “关键操作开始…”。
  2. main 函数创建了这个任务,并用 asyncio.shield() 将其包裹。
  3. canceller 任务在一秒后调用 shielded_task.cancel()。取消请求被 shield() 创建的 Future 吸收了。
  4. main 函数中,await shielded_task 立即抛出 CancelledError,打印 “主协程捕获到 CancelledError…”。这让调用者以为任务已经被取消了。
  5. 然而,内部的 critical_operation 并没有收到 CancelledError,它会继续执行,打印后续的 “关键操作正在进行中…” 直到完成。
  6. 最后,await task 会成功地等待原始任务完成,并获取其返回值,证明了它并未被中断。

注意事项和限制

  • shield() 并非万能shield() 只能防止其包裹的任务被包含它的协程的取消操作所影响。如果有人直接获得了对内部任务的引用并取消它,那么 shield() 将无法阻止。
  • 优雅关闭的复杂性:在处理像 Ctrl+C (SIGINT) 这样的程序中断信号时,通常会取消所有正在运行的任务。在这种情况下,asyncio.shield() 可能不足以保护任务,因为内部任务本身也可能被直接取消。需要更复杂的信号处理和关闭逻辑来实现真正的优雅关闭。