如何在zmq的推/拉模式中设置hwm?

2024-10-01 07:37:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我发现了一个类似的问题,ZeroMQ: HWM on PUSH does not work,但它不能解决我的问题。在

我想控制推送套接字排队的消息数,但它不起作用,仍将排队1000条消息。
所以我想知道如何设置推插座的hwm。提前谢谢。在

我的环境是:libzmq 4.0.4、pyzmq 14.1.0、python3.3

我的代码是:

在服务器.py在

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random

import zmq


class TestPush(object):

    def __init__(self):
        self.ctx = zmq.Context()

        random.seed()

    def run(self):
        task_snd = self.ctx.socket(zmq.PUSH)
        task_snd.setsockopt(zmq.SNDHWM, 10)
        task_snd.bind('tcp://*:53000')        

        while True:
            workload = str(random.randint(1, 100))
            task_snd.send(workload.encode('utf-8'))
            print('Send {0}'.format(workload))


if __name__ == '__main__':
    test_push = TestPush()
    test_push.run()

在客户端.py在

^{pr2}$

Tags: pyimportself消息taskdefrandomzmq
1条回答
网友
1楼 · 发布于 2024-10-01 07:37:20

当我试图在push-and-pull套接字上设置HWM(高水位线)时,我在ZeroMQ中遇到了类似的问题。甚至,它也不能与酒吧和酒吧的插座一起工作。我想解释一下发生了什么事,它是如何解决的。在

我做了两个脚本,首先作为一个带有推套接字的发送方,另一个作为带有拉套接字的接收方。将两个插座上的HWM设置为10。在receiver脚本中,我在每个消息接收后设置了5秒的延迟。然后,我用100条消息循环运行sender脚本(在保持接收方运行以接收消息之后)。在

我所期望的:

接收方队列和发送方队列将到达hwm。之后,发送者将停止发送更多的消息。在

但是发生了什么:

发送者发送所有100条消息并退出。但是接收者在很长一段时间内一直在一个接一个地处理消息,直到接收到所有的消息。在

经过研究,我找到了原因:

有一种东西叫做内核套接字缓冲区,它位于发送方套接字和接收方套接字之间。每当一个进程打开一个套接字时,内核就会为tcp套接字分配内存空间,默认情况下是128KB。内核套接字缓冲区适用于发送方和接收方套接字(因此总缓冲区为128KB+128KB)。我的消息大小以字节为单位(一个包含一些字符的int)。因此,总的消息缓冲如下:

缓冲区消息总数= 发送器插座hwm +发送方套接字的内核套接字缓冲区(128KB) +接收器插座hwm +发送方套接字的内核套接字缓冲区(128KB)

解决方案:

现在,我把我的信息长度改为1KB多一点。然后再次执行测试,发现大约260条消息已发送(如预期),之后发送方会在间隔时间内停止,直到接收方接收到一些消息并再次启动。在


附加信息

为了使push socket在接收方无法接收的情况下继续发送消息,我们可以在send例程中使用NOBLOCK选项,但这样接收方丢失的消息数量会增加很多。所以,更好的选择是使用Poll或timeout,然后使用NOBLOCK选项调用send例程。在

请注意,您可以使用zeromq的SNDBUFF/RCVBUFF,但操作系统可能不遵守它(在我的例子中,它不起作用)。在

相关问题 更多 >