ZeroMQ XPUB recv()是否是查找是否存在订阅者并解决慢速加入者综合症的解决方案?

2024-09-30 02:33:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我的用例:

  1. 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息
  2. 发布服务器将在不同线程内初始化为客户端(连接到端口)
  3. 每个线程中要发布的数据将是两条消息
  4. 当用户连接时,尽快获取每条消息是很重要的
  5. 如果订阅服务器未连接,则我不希望阻止发布服务器线程,理想情况下,在1-2秒左右的时间内它可以正常工作

慢加入问题:

运行1000多个线程(发布服务器)仅1到2次,我就可以获得订阅服务器中的所有数据。 增加几毫秒的睡眠时间就解决了这个问题,因此我99.9%确信我是众所周知的慢连接综合症的受害者。但是,在我的情况下,睡眠解决方案不是一个好的解决方案,因为publisher的连接时间可能是可变的,我希望尽快将数据发送给订阅服务器

我关于解决此问题的想法和实验代码:

我的解决方案基于使用XPUB recv方法。使用XPUB初始化发布服务器,并将RCVTIMEO设置为1000ms。在发布服务器连接之后,我添加了一个recv()调用来检查是否有订户。当我收到subscribe消息时,我知道连接已经完成,我可以发送数据而不会丢失任何数据(除非订阅者发生错误,但我不在乎)

如果我没有收到任何订阅消息,那么In 1000msrecv()超时,线程终止

下面是python(pyzmq)中的示例代码,用于测试此实现(对于publisher,我不使用线程,而是使用while循环并同时运行多个publisher),它可以按照我的要求工作:

publisher.py:

import zmq

def main():
    """ main method """

    i = 0
    while True:
        # Prepare context and publisher
        context = zmq.Context()
        publisher = context.socket(zmq.XPUB)
        publisher.connect("tcp://0.0.0.0:5650")
        publisher.setsockopt(zmq.RCVTIMEO, 1000)

        # Waiting for 1000ms to get a subscription
        i = i + 1
        try:
            publisher.recv()
            # Send the message
            publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
        except Exception as e:
            print(e, flush=True)

        # Terminate socket and context
        publisher.close()
        context.term()
        if i >= 10000:
            break

if __name__ == "__main__":
    main()    

subscriber.py:

import zmq

def main():
    """ main method """

    # Prepare our context and subscriber
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    uri = "tcp://0.0.0.0:5650"
    subscriber.bind(uri)
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    print('Subscriber connects to %s' % (uri), flush=True)

    # Receive messages
    i = 0
    while True:
        [topic, data] = subscriber.recv_multipart()
        i = i + 1
        print("%s: %s %s" % (i, topic, data), flush=True)

if __name__ == "__main__":
    main()

我的问题:

解决方案有那么简单吗?如果有订阅服务器处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速连接相关)


Tags: and数据服务器true消息maincontextzmq
1条回答
网友
1楼 · 发布于 2024-09-30 02:33:10

Q : "Is the solution that simple?"

恰恰相反。对于上面发布的内容,解决方案是过于复杂的w.r.t.到目前为止发布的用例需求

a)鉴于上述要求,有可能消除与安装和维护相关的所有成本;在属于同一进程的同一主机上的同一线程之间进行通信时,维护ISO-OSI-L3tcp://传输类。相反,使用超快、无堆栈、内存映射的inproc://传输类可以避免所有这些低效。ZeroMQ API v4.0+在设置inproc://-TransportClass { .bind() | .connect() }-外观顺序方面也没有其他条件,因此我们可以享受零拷贝“传输”消息的最大MEM映射超低延迟标记(不移动RAM中的一个字节数据)-酷,不是吗?(除非需要注入MITM协议嗅探,否则请删除tcp://过杀)

b)考虑到上述要求,在“静态”端订阅所有消息的情况下,一对消息的传递是PUB/SUB可伸缩正式通信模式原型的极为低效的使用。您的代码必须支付所有费用来设置一个新的SUB-实例,然后它爬网来设置一个有效的连接(在tcp://-TransportClass的堆栈上,希望在a下删除),下一步要设置一个新的主题过滤器(无论是在早期版本中的SUB端还是在较新的ZeroMQ版本中的PUB端进行操作,都要付出巨大的代价,因为这样做只是为了接收所有消息,即根本不进行过滤)。同样的正式服务可以通过更轻量级的多个节点来实现-PUSH/PULL-在一个节点上。如果不需要任何反向/双向/更复杂的正式通信,那么这一个PUSH/PULL就可以完成请求的任务

c)考虑到上述要求,您的重点似乎放在通过连接过早发送消息而不丢失消息上。在ZeroMQ设置中,有一些工具可以确定这一点,但您却不小心使用它们:

  • 使用zmq.IMMEDIATE可能会在没有现成连接工作(或从未工作)的情况下使用AccessNode的阻塞状态
  • 使用返回码&errno(或zmq.errno()对于POSIX不兼容的操作系统/Win32等)处理可能有助于您的代码检测&;在的整个生命周期内,对“自治代理网络”中发生的任何和所有特定情况作出反应(无论代理实际上是“物理”分布还是共同定位,如此处所示)。不失去控制是这里的核心责任。什么是控制代码,它在失去控制的状态下自锁,甚至无法控制自己;)

d)切勿使用{ .recv() | .send() | .poll() | ... }-方法的阻塞形式。教科书中的例子与专业的信令/消息传递元平面实现应该是什么样子完全相反。事实上,从未-参考上述第5)项

e)更好地重复使用Context()实例,而不是像上面所描述的那样使其成为可消费/一次性的。线程可以自由地共享一个预实例化的Context()引擎,避免下一个大量重复的附加开销,如果每个分叉的线程都重新实例化一个可消费的/一次性的Context(),那么就只需要一个短期的对等客户端线程

f)如果有人知道更好的解决方案,请随时通知我们:o)

评论中的问题

a)
Subscriber will be on another machine, so I think tcp:// is the solution.*

当然,这里是NP{ pgm:// | epgm:// | tipc:// }-如果进一步向更高的性能级别发展,这里的传输可能会很有趣

b)
Subscriber is going to forward via an XPUB socket the messages to other subscribers. PUSH/PULL could work but if I want to pass those subscriptions and their filters to the initial publishers and filter some messages at the source, I have to use PUB/SUB pattern.

嗯,在O/p中没有提到。任何XPUBs/XSUBs的分层都可能工作得很好,问题在于连接管理级别

c)
Just to clarify, not losing messages is important only when there is a subscriber. Could you explain this part a little more?

当然,由于RTO连接的链路上没有可用的订阅者,随时准备“通过线路”立即发送,因此任何消息都无法发送(并且可以无声地丢弃,这就是wha)你不试着去反抗吗。这就是zmq.IMMEDIATE可以通过调用.setsockopt()-方法来管理的

相关问题 更多 >

    热门问题