Python异步重启协同程序

2024-10-02 16:27:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我是一名新的python程序员,正在尝试编写一个“bot”来为自己在betfair上进行交易。(雄心勃勃!!!!)

我遇到的问题是——我有运行asyncio事件循环的基本知识,但我注意到,如果其中一个协程在其进程中失败(例如API调用失败或mongodb读取失败),那么asyncio事件循环将继续运行,但忽略一个失败的协程

我的问题是,我如何能够自动重新启动一个协程,或者处理一个错误来停止整个asyncio循环,但此时一切运行起来似乎都忘记了一个事实,即有些事情不正确,其中一部分失败了。在我的例子中,在数据库读取未成功后,循环从未返回到“rungetcompetitionids”函数,并且它从未再次返回到函数,即使它处于while true循环中 usergui还不能正常工作,但只能尝试异步IO

谢谢 克莱夫

import sys
import datetime
from login import sessiontoken as gst
from mongoenginesetups.setupmongo import global_init as initdatabase
from asyncgetcompetitionids import competition_id_pass as gci
from asyncgetcompetitionids import create_comp_id_table_list as ccid
import asyncio
import PySimpleGUI as sg

sg.change_look_and_feel('DarkAmber')

layout = [
    [sg.Text('Password'), sg.InputText(password_char='*', key='password')],
    [sg.Text('', key='status')],
    [sg.Button('Submit'), sg.Button('Cancel')]
]
window = sg.Window('Betfair', layout)


def initialisethedatabase():
    initdatabase('xxxx', 'xxxx', xxxx, 'themongo1', True)


async def runsessiontoken():
    nextlogontime = datetime.datetime.now()
    while True:
        returned_login_time = gst(nextlogontime)
        nextlogontime = returned_login_time
        await asyncio.sleep(15)


async def rungetcompetitionids(compid_from_compid_table_list):
    nextcompidtime = datetime.datetime.now()
    while True:
        returned_time , returned_list = gci(nextcompidtime, compid_from_compid_table_list)
        nextcompidtime = returned_time
        compid_from_compid_table_list = returned_list
        await asyncio.sleep(10)


async def userinterface():
    while True:
        event, value = window.read(timeout=1)
        if event in (None, 'Cancel'):
            sys.exit()
        if event != "__TIMEOUT__":
            print(f"{event} {value}")
        await asyncio.sleep(0.0001)


async def wait_list():
    await asyncio.wait([runsessiontoken(), 
                       rungetcompetitionids(compid_from_compid_table_list), 
                       userinterface()
                      ])


initialisethedatabase()
compid_from_compid_table_list = ccid()
print(compid_from_compid_table_list)
nextcompidtime = datetime.datetime.now()
print(nextcompidtime)
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_list())
loop.close()

Tags: fromimporteventasynciodatetimedefastable
1条回答
网友
1楼 · 发布于 2024-10-02 16:27:48

一个简单的解决方案是使用一个包装函数(或“supervisor”)捕获Exception,然后盲目地重试该函数。更优雅的解决方案包括出于诊断目的打印异常和堆栈跟踪,以及查询应用程序状态以查看尝试并继续是否有意义。例如,如果betfair告诉您您的帐户未经授权,那么继续下去就毫无意义。如果这是一个普遍的网络错误,那么立即重试可能是值得的。如果主管注意到它在很短的时间内重新启动了很多次,您也可能希望停止重试

例如

import asyncio
import traceback
import functools
from collections import deque
from time import monotonic

MAX_INTERVAL = 30
RETRY_HISTORY = 3
# That is, stop after the 3rd failure in a 30 second moving window

def supervise(func, name=None, retry_history=RETRY_HISTORY, max_interval=MAX_INTERVAL):
    """Simple wrapper function that automatically tries to name tasks"""
    if name is None:
        if hasattr(func, '__name__'): # raw func
            name = func.__name__
        elif hasattr(func, 'func'): # partial
            name = func.func.__name__
    return asyncio.create_task(supervisor(func, retry_history, max_interval), name=name)


async def supervisor(func, retry_history=RETRY_HISTORY, max_interval=MAX_INTERVAL):
    """Takes a noargs function that creates a coroutine, and repeatedly tries
        to run it. It stops is if it thinks the coroutine is failing too often or
        too fast.
    """
    start_times = deque([float('-inf')], maxlen=retry_history)
    while True:
        start_times.append(monotonic())
        try:
            return await func()
        except Exception:
            if min(start_times) > monotonic() - max_interval:
                print(
                    f'Failure in task {asyncio.current_task().get_name()!r}.'
                    ' Is it in a restart loop?'
                )
                # we tried our best, this coroutine really isn't working.
                # We should try to shutdown gracefully by setting a global flag
                # that other coroutines should periodically check and stop if they
                # see that it is set. However, here we just reraise the exception.
                raise
            else:
                print(func.__name__, 'failed, will retry. Failed because:')
                traceback.print_exc()


async def a():
    await asyncio.sleep(2)
    raise ValueError


async def b(greeting):
    for i in range(15):
        print(greeting, i)
        await asyncio.sleep(0.5)


async def main_async():
    tasks = [
        supervise(a),
        # passing repeated argument to coroutine (or use lambda)
        supervise(functools.partial(b, 'hello'))
    ]
    await asyncio.wait(
        tasks,
        # Only stop when all coroutines have completed
        #   this allows for a graceful shutdown
        # Alternatively use FIRST_EXCEPTION to stop immediately 
        return_when=asyncio.ALL_COMPLETED,
    )
    return tasks


def main():
    # we run outside of the event loop, so we can carry out a post-mortem
    # without needing the event loop to be running.
    done = asyncio.run(main_async())
    for task in done:
        if task.cancelled():
            print(task, 'was cancelled')
        elif task.exception():
            print(task, 'failed with:')
            # we use a try/except here to reconstruct the traceback for logging purposes
            try:
                task.result()
            except:
                # we can use a bare-except as we are not trying to block
                # the exception   just record all that may have happened.
                traceback.print_exc()

main()

这将产生如下输出:

hello 0
hello 1
hello 2
hello 3
a failed, will retry. Failed because:
Traceback (most recent call last):
  File "C:\Users\User\Documents\python\src\main.py", line 30, in supervisor
    return await func()
  File "C:\Users\User\Documents\python\src\main.py", line 49, in a
    raise ValueError
ValueError
hello 4
hello 5
hello 6
hello 7
a failed, will retry. Failed because:
Traceback (most recent call last):
  File "C:\Users\User\Documents\python\src\main.py", line 30, in supervisor
    return await func()
  File "C:\Users\User\Documents\python\src\main.py", line 49, in a
    raise ValueError
ValueError
hello 8
hello 9
hello 10
hello 11
Failure in task 'a'. Is it in a restart loop?
hello 12
hello 13
hello 14
 exception=ValueError()> failed with:
Traceback (most recent call last):
  File "C:\Users\User\Documents\python\src\main.py", line 84, in main
    task.result()
  File "C:\Users\User\Documents\python\src\main.py", line 30, in supervisor
    return await func()
  File "C:\Users\User\Documents\python\src\main.py", line 49, in a
    raise ValueError
ValueError

相关问题 更多 >