我正在尝试向现有的asyncio循环中添加一些代码,以便在Ctrl-C上完全关闭。下面是它所做工作的抽象
import asyncio, signal
async def task1():
print("Starting simulated task1")
await asyncio.sleep(5)
print("Finished simulated task1")
async def task2():
print("Starting simulated task2")
await asyncio.sleep(5)
print("Finished simulated task2")
async def tasks():
await task1()
await task2()
async def task_loop():
try:
while True:
await asyncio.shield(tasks())
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Shutting down task loop")
raise
async def aiomain():
loop = asyncio.get_running_loop()
task = asyncio.Task(task_loop())
loop.add_signal_handler(signal.SIGINT, task.cancel)
await task
def main():
try:
asyncio.run(aiomain())
except asyncio.CancelledError:
pass
#def main():
# try:
# loop = asyncio.get_event_loop()
# loop.create_task(aiomain())
# loop.run_forever()
# except asyncio.CancelledError:
# pass
if __name__ == '__main__':
main()
在本例中,假设task1
和task2
的序列一旦启动就需要完成,否则某些工件将处于不一致的状态。(因此,围绕调用tasks
的asyncio.shield
包装器。)
对于上面的代码,如果我在脚本启动后不久中断脚本,并且它刚刚被打印Starting simulated task1
,那么循环将停止,并且task2
永远不会开始。如果我尝试切换到注释掉的main
版本,那么就永远不会退出,即使循环被正确地取消,并且至少在几分钟内没有进一步的事情发生。它确实取得了一些进展,因为它至少完成了task1
和task2
的任何正在进行的序列
通过头脑风暴,我找到了一些可能的解决方案,尽管我仍然觉得我缺少了一些更简单的东西:
asyncio.shield
创建一个包装器,该包装器递增由asyncio.Condition
对象同步的变量,运行屏蔽函数,然后递减该变量。然后,在CancelledError
处理程序中的aiomain
中,等待变量达到零,然后重新引发异常。(在一个实现中,我可能会使用__aexit__
实现CancelledError
逻辑上的等待零,将这个类的所有部分合并到一个类中。)asyncio
的取消机制,而是使用asyncio.Event
或类似的机制来允许中断点或可中断睡眠。虽然这看起来更具侵入性,要求我指定哪些点被认为是可中断的,而不是声明哪些序列需要屏蔽以避免取消李>
以下是我最终使用的:
与Paul Cornelius的回答类似,这会在允许
CancelledError
向上传播调用链之前插入一个等待子任务完成的过程。但是,它不需要在调用asyncio.shield
的地方以外的地方触摸代码(在我的实际用例中,我有三个循环同时运行,使用一个
asyncio.Lock
来确保一个任务或一系列任务在另一个任务开始之前完成。我在锁上也有一个asyncio.Condition
从一个协程到另一个协程进行通信。当我尝试在aiomain
或main
中等待的方法时)对于要完成的所有屏蔽任务,我遇到了一个问题,取消的父任务释放了锁,然后屏蔽任务尝试使用该锁向条件变量发送信号,给出了一个错误。将获取和释放锁移动到屏蔽任务中也没有意义-这将导致任务B仍按顺序运行:shielded task A启动,task B的协同路由将使其计时器过期,并阻止等待锁Control+C。另一方面,通过将等待置于shield_and_wait
调用点,它巧妙地避免了过早释放锁。)一个警告:似乎
shield_and_wait_decorator
在类方法上不能正常工作这是一个很好的问题。在解答问题的过程中,我学到了一些东西,所以我希望您仍在监视此线程
首先要调查的是,shield()方法是如何工作的?在这一点上,文档至少可以说是令人困惑的。在阅读test_tasks.py中的标准库测试代码之前,我无法理解它。以下是我的理解:
考虑此代码片段:
当执行task_a.cancel()语句时,task_a确实被取消。wait语句立即抛出cancelederror,而不等待任务完成。但task_b继续运行。外部任务(a)停止,但内部任务(b)不停止
下面是您的程序的修改版本,说明了这一点。主要的变化是在CanceledError异常处理程序中插入一个等待,使程序的活动时间延长几秒钟。我在Windows上运行,这就是为什么我也稍微改变了你的信号处理程序,但这只是一个小问题。我还在打印报表中添加了时间戳
您可以看到您的程序无法工作,因为在task1和task2有机会完成之前,task_循环一取消,程序就退出了。他们一直都在那里(或者更确切地说,如果程序继续运行,他们会一直在那里)
这说明了shield()和cancel()是如何交互的,但实际上并不能解决您所说的问题。为此,我认为,您需要有一个可等待的对象,您可以使用它来保持程序的运行,直到重要任务完成。这个对象需要在顶层创建,并沿着堆栈传递到执行重要任务的位置。这是一个与您的程序类似的程序,但是按照您想要的方式执行
我做了三次运行:(1)任务1期间的control-C,(2)任务2期间的control-C,(3)两个任务完成后的control-C。在前两种情况下,程序一直持续到task2完成。在第三个案例中,它立即结束
相关问题 更多 >
编程相关推荐