python3中使用asyncio和websockets的长延迟

2024-10-01 15:45:55 发布

您现在位置:Python中文网/ 问答频道 /正文

在处理从websocket服务器推送到我的服务器的数据时,我遇到了一个长时间(3小时)的延迟(编辑:最初延迟很短,然后一整天都会变长)客户端.py. 我知道它不会被服务器延迟。在

例如,每隔5秒,我就会看到keep-unu-alive日志事件及其相应的时间戳。所以一切顺利。但是当我看到日志中处理的数据帧实际上是服务器发送它的之后的3个小时。我是否在做什么来延迟这个过程?在

我是不是把我的合作计划叫做“让你活着”对吗?keep-unu-alive只是发送给服务器的一条消息,用于保持连接。服务器回显消息。我是不是日志太多了?这是否会延迟处理(我不这么认为,因为我看到日志事件立即发生)。在

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)

编辑: 从documentation-我想这可能与它有关-但我还没有把这些点联系起来。在

The max_queue parameter sets the maximum length of the queue that holds incoming messages. The default value is 32. 0 disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue. In order to prevent excessive memory consumption when messages are received faster than they can be processed, the queue must be bounded. If the queue fills up, the protocol stops processing incoming data until recv() is called. In this situation, various receive buffers (at least in asyncio and in the OS) will fill up, then the TCP receive window will shrink, slowing down transmission to avoid packet loss.

编辑:2018年9月28日:我正在测试它,但没有keep-alive消息,这似乎不是问题所在。是否与convert_and_store()函数有关?这是否需要是async def,然后再等待?在

^{pr2}$

编辑10/1/2018:似乎keep alive消息和convert_and_store似乎都有问题;如果我将keep alive消息延长到60秒,convert_and_store将每60秒运行一次。所以convert_and_store正在等待keep_alive()。。。在


Tags: andthetostorein服务器消息编辑
3条回答

您必须为这个keep_alive()函数启动一个新线程。在

对于async-await,它承诺在进入下一步之前所有任务都已完成。在

因此,await keep_alive(websocket)实际上在这个意义上阻塞了线程。您可能不会在这里等待keep_alive以便继续该过程,但可以肯定的是,这不是您想要的,我确定。在

实际上,您需要的是两个时间框架,一个用于与服务器通信,一个用于保持服务器活动。他们应该分开,因为他们是在不同的合作路线。在

所以,正确的方法是使用Thread,在这种情况下不需要使用asyncio,保持简单。在

首先,将keep_alive()改为以下内容。在

def keep_alive():
    """
        This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
    """
    while True:
        websocket.send('Hello') 
        time.sleep(1)

open_connection_test()

^{pr2}$

我认为这会更清楚,使用ThreadPoolExecutor使阻塞代码在后台运行

from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=4)

def convert_and_store(data, divert=False, test=False):
    loop = asyncio.get_event_loop()
    loop.run_in_executor(pool, _convert_and_store, divert, test)


def _convert_and_store(data, divert=False, test=False):
    if test:
        data = b
    fields = data.keys()
    file_name = parse_call_type(data, divert=divert)
    json_to_csv(data, file_name, fields)

异步发送保持活动消息演示

^{pr2}$

Could it be related to the convert_and_store() function?

是的,有可能。不应直接调用阻塞代码。如果函数执行CPU密集型计算1秒,则所有异步任务和IO操作都将延迟1秒。在

执行器可用于在不同的线程/进程中运行阻塞代码:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

在你的情况下,应该是这样的:

^{pr2}$

已编辑

It seems that both the keep-alive message and convert_and_store are both at issue

您可以在后台运行keep_alive

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...

相关问题 更多 >

    热门问题