使用aiohttp下载多个文件时处理websocket流

2024-10-02 14:17:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在按照指令(here)在本地机器上镜像Binance Exchange上的多个订单

为了简单起见,我想镜像两个符号的订单:ETHBTC和DOGEBTC(实际上是350+)

首先,我必须缓冲websocket订单更新流:

  • wss://stream.binance.com:9443/stream?streams=ETHBTC@depth@100ms
  • wss://stream.binance.com:9443/stream?streams=DOGEBTC@depth@100ms

现在我必须下载快照:

一旦我有了快照,我就将缓冲区(正在进行中)应用到它们,产生一个状态

之后,所有订单更新都可以简单地应用于状态

对于更新流,我可以执行以下操作:

        async with aiohttp.ClientSession() as session:
            async with session.ws_connect(URL) as wsock:
                async for msg in wsock:
                    if msg.type != aiohttp.WSMsgType.TEXT:
                        J = json.loads(msg.data)
                        symbol = J['data']['s']

                        process_update(symbol, J)

但是,在第一次更新之后,我如何启动下载快照,并使用一个将处理快照的完成处理程序,从而不中断流

如果我跟踪300个符号,那就是300个下载同时发生

我找到了有关异步下载多个文件的资源,但我看不出如何将其与处理流的要求相结合

我总是可以在一个单独的线程中进行下载,但这不是在与aiohttp的架构目标作斗争吗

参考:


Tags: https订单comapistreamasync镜像aiohttp
1条回答
网友
1楼 · 发布于 2024-10-02 14:17:41

回答:感谢IRC Freenode#python上的Graingret🙏

import anyio.to_thread


async def foo():
    async def download(symbol):
        async with session.get(f"{url}/{symbol}") as resp:
            await do_something(resp)

    async with aiohttp.ClientSession() as session, session.ws_connect(
        URL
    ) as wsock, anyio.create_task_group() as tg:
        async for msg in wsock:
            if msg.type != aiohttp.WSMsgType.TEXT:
                J = json.loads(msg.data)
                symbol = J["data"]["s"]

                tg.start_soon(download, symbol)
                await anyio.to_thread.run(process_update, symbol, J)

相关问题 更多 >

    热门问题