捕获作业执行期间发生的异常

2024-10-02 22:26:17 发布

您现在位置:Python中文网/ 问答频道 /正文

import functools
import aioschedule as schedule
import asyncio


def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                if cancel_on_failure:
                    return schedule.CancelJob

        return wrapper

    return catch_exceptions_decorator


@catch_exceptions(cancel_on_failure=True)
async def bad_task():
    print(1 / 0)


def main():
    schedule.every().minute.do(bad_task)
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(schedule.run_pending())


if __name__ == '__main__':
    main()

我希望捕获作业中将发生的异常并停止作业

这里我使用的是aioscheduleaioschedule。 此解决方案从exception handling中选取。
运行上述代码后ZeroDivisionError: division by zero错误继续发生

当我使用scheduleschedule而不是aioschedule时,上面的代码工作得非常好。
但我的方法是异步方法,我使用wait。
任何帮助都将不胜感激。
多谢各位


Tags: importloopreturnfailuremainondefjob
1条回答
网友
1楼 · 发布于 2024-10-02 22:26:17

我在bad_task()的末尾添加了一个await语句,它将等待函数调用完成。因此,如果函数以错误终止,结果将在while循环中捕获,程序将终止

async def bad_task():
    print(1/0)
    await asyncio.sleep(0.1) # waits for your function to execute completely


def main():
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(bad_task())


if __name__ == '__main__':
    main()

更新:这里有一个更好的方法,可以避免在函数末尾使用await语句

async def bad_task():
    print(1/0)


def main():
    loop = asyncio.get_event_loop()
    while True:
        # collect the promises
        result = asyncio.gather(bad_task())
        loop.run_until_complete(result)

相关问题 更多 >