仅在Python asyncio中处理所有websocket消息后返回

2024-10-03 17:29:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用Kucoin(crypto exchange)的流式API发送order_book更新,我正在尝试使用消息处理程序方法处理入站order_bookwebsocket消息,然后只在处理完所有消息后返回,但我认为我设计的模式错误,或者误解了async&;网袋

我想要的是这样的东西-下面的代码不起作用,但为了说明目的,它是我试图实现的,所以请容忍我的蜥蜴大脑在这方面

async for message in websocket:
    message_handled = handler_method(message)  # this updates self.order_book
    if websocket.index(message) + 1 == len(websocket): # check if this is final message
        return self.order_book

正确的方法是什么,这样所有的message在返回之前都由handler_method()处理

编辑:

请参阅websocket的调试日志示例

client - event = data_received(<1016 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741271,"change":"2346.05,buy,120","timestamp":1627660760709},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741272,"change":"2345.1,buy,33","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741273,"change":"2346.1,buy,3360","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741274,"change":"2346.15,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741275,"change":"2346.85,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741276,"change":"2346.05,buy,540","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<849 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741277,"change":"2344.8,buy,2145","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741278,"change":"2344.8,buy,1667","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741279,"change":"2346.1,buy,3780","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741280,"change":"2344.8,buy,1189","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741281,"change":"2344.8,buy,711","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<1690 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741282,"change":"2346.05,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741283,"change":"2338.1,buy,407","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741284,"change":"2360.25,sell,9681","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741285,"change":"2337.8,buy,20","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741286,"change":"2338.1,buy,6","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741287,"change":"2334.1,buy,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741288,"change":"2346.1,buy,3360","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741289,"change":"2346.2,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741290,"change":"2346.1,buy,3780","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741291,"change":"2354.3,sell,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)```

Tags: clientfalsetruemessagedatatopicchangeframe
2条回答

看来你在设计这个东西,所以让我给你一些提示。首先,在网络中,通常是将一条消息分块发送。但是为了让它正常工作,你需要做一些额外的事情。最重要的是:您需要边界,这是一种与您的服务器通信的方式,即此分块消息的开始和结束

例如,您可以假设消息是按严格的顺序来的:对于给定的数据集X和块a_1,…,a_k您设计的协议首先发送“新数据集和k块”消息,然后依次发送每个块a_i。这类似于HTTP及其“Content-Length:”头,后面是具体长度的内容。一种变体是,您发送“新数据集”消息,然后依次发送所有A_i和“数据集结束”消息。这就是HTTP和“传输编码:分块”头的工作原理

另一种方法是发送“带有k个块的Id T的新数据集”消息,然后将Id“T”添加到每个块中。这允许您异步处理事情(例如,同时交错两个数据集)。因此,它显然是有益的,但很难正确实施

一旦设计了协议,就必须实现服务器端。根据不同的方法,它将有不同的实现。例如,对于顺序场景,这非常简单。下面是一个伪代码:

while True:
    message = await websocket.receive()  # <  assumed to be the initial "new data set" message

    # TODO: break if disconnected or invalid message

    total_chunks_length = get_chunks_length(message)
    total_processed_data = []
    while total_chunks_length > 0:
        message = await websocket.receive()
        processed_data = await handler_method(message)
        total_processed_data.append(processed_data)
        total_chunks_length -= 1

    await websocket.send(total_processed_data)

对于第二种异步设计,这变得更加困难,因为您必须跟踪ID。我这里不详细讲了,希望你能想出一个解决办法

我不确定这对将来的其他人是否有帮助,但我最终解决了这个问题,方法是调用websocket.recv(),然后使用while循环直接处理ws.messages消息队列。请参见以下示例:

message = json.loads(await ws.recv())
handler_method(message)

while len(ws.messages) > 0:
    message = json.loads(ws.messages.popleft())
    handler_method(message)

return something

它的作用是:recv从websocket连接接收所有消息,将它们加载到ws.messages队列(一个collections.deque对象)中,并调用ws.messages.popleft()一次,从而生成要处理的单个消息。然后,我们只需在while循环中调用ws.messages.popleft(),直到没有剩余消息为止,最后在while循环结束后调用return

相关问题 更多 >