我的用例:
慢加入问题:
运行1000多个线程(发布服务器)仅1到2次,我就可以获得订阅服务器中的所有数据。 增加几毫秒的睡眠时间就解决了这个问题,因此我99.9%确信我是众所周知的慢连接综合症的受害者。但是,在我的情况下,睡眠解决方案不是一个好的解决方案,因为publisher的连接时间可能是可变的,我希望尽快将数据发送给订阅服务器
我关于解决此问题的想法和实验代码:
我的解决方案基于使用XPUB recv方法。使用XPUB初始化发布服务器,并将RCVTIMEO设置为1000ms。在发布服务器连接之后,我添加了一个recv()
调用来检查是否有订户。当我收到subscribe消息时,我知道连接已经完成,我可以发送数据而不会丢失任何数据(除非订阅者发生错误,但我不在乎)
如果我没有收到任何订阅消息,那么In 1000msrecv()
超时,线程终止
下面是python(pyzmq)中的示例代码,用于测试此实现(对于publisher,我不使用线程,而是使用while循环并同时运行多个publisher),它可以按照我的要求工作:
publisher.py:
import zmq
def main():
""" main method """
i = 0
while True:
# Prepare context and publisher
context = zmq.Context()
publisher = context.socket(zmq.XPUB)
publisher.connect("tcp://0.0.0.0:5650")
publisher.setsockopt(zmq.RCVTIMEO, 1000)
# Waiting for 1000ms to get a subscription
i = i + 1
try:
publisher.recv()
# Send the message
publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
except Exception as e:
print(e, flush=True)
# Terminate socket and context
publisher.close()
context.term()
if i >= 10000:
break
if __name__ == "__main__":
main()
subscriber.py:
import zmq
def main():
""" main method """
# Prepare our context and subscriber
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
uri = "tcp://0.0.0.0:5650"
subscriber.bind(uri)
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
print('Subscriber connects to %s' % (uri), flush=True)
# Receive messages
i = 0
while True:
[topic, data] = subscriber.recv_multipart()
i = i + 1
print("%s: %s %s" % (i, topic, data), flush=True)
if __name__ == "__main__":
main()
我的问题:
解决方案有那么简单吗?如果有订阅服务器处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速连接相关)
恰恰相反。对于上面发布的内容,解决方案是过于复杂的w.r.t.到目前为止发布的用例需求
a)鉴于上述要求,有可能消除与安装和维护相关的所有成本;在属于同一进程的同一主机上的同一线程之间进行通信时,维护ISO-OSI-L3
tcp://
传输类。相反,使用超快、无堆栈、内存映射的inproc://
传输类可以避免所有这些低效。ZeroMQ API v4.0+在设置inproc://
-TransportClass{ .bind() | .connect() }
-外观顺序方面也没有其他条件,因此我们可以享受零拷贝“传输”消息的最大MEM映射超低延迟标记(不移动RAM中的一个字节数据)-酷,不是吗?(除非需要注入MITM协议嗅探,否则请删除tcp://
过杀)b)考虑到上述要求,在“静态”端订阅所有消息的情况下,一对消息的传递是
PUB/SUB
可伸缩正式通信模式原型的极为低效的使用。您的代码必须支付所有费用来设置一个新的SUB
-实例,然后它爬网来设置一个有效的连接(在tcp://
-TransportClass的堆栈上,希望在a下删除),下一步要设置一个新的主题过滤器(无论是在早期版本中的SUB端还是在较新的ZeroMQ版本中的PUB端进行操作,都要付出巨大的代价,因为这样做只是为了接收所有消息,即根本不进行过滤)。同样的正式服务可以通过更轻量级的多个节点来实现-PUSH/PULL
-在一个节点上。如果不需要任何反向/双向/更复杂的正式通信,那么这一个PUSH/PULL
就可以完成请求的任务c)考虑到上述要求,您的重点似乎放在通过连接过早发送消息而不丢失消息上。在ZeroMQ设置中,有一些工具可以确定这一点,但您却不小心使用它们:
zmq.IMMEDIATE
可能会在没有现成连接工作(或从未工作)的情况下使用AccessNode的阻塞状态errno
(或zmq.errno()
对于POSIX不兼容的操作系统/Win32等)处理可能有助于您的代码检测&;在distributed-computing的整个生命周期内,对“自治代理网络”中发生的任何和所有特定情况作出反应(无论代理实际上是“物理”分布还是共同定位,如此处所示)。不失去控制是这里的核心责任。什么是控制代码,它在失去控制的状态下自锁,甚至无法控制自己;)李>d)切勿使用
{ .recv() | .send() | .poll() | ... }
-方法的阻塞形式。教科书中的例子与专业的信令/消息传递元平面实现应该是什么样子完全相反。事实上,从未-参考上述第5)项e)更好地重复使用
Context()
实例,而不是像上面所描述的那样使其成为可消费/一次性的。线程可以自由地共享一个预实例化的Context()
引擎,避免下一个大量重复的附加开销,如果每个分叉的线程都重新实例化一个可消费的/一次性的Context()
,那么就只需要一个短期的对等客户端线程f)如果有人知道更好的解决方案,请随时通知我们:o)
评论中的问题
当然,这里是NP
{ pgm:// | epgm:// | tipc:// }
-如果进一步向更高的性能级别发展,这里的传输可能会很有趣嗯,在O/p中没有提到。任何
XPUB
s/XSUB
s的分层都可能工作得很好,问题在于连接管理级别当然,由于RTO连接的链路上没有可用的订阅者,随时准备“通过线路”立即发送,因此任何消息都无法发送(并且可以无声地丢弃,这就是wha)你不试着去反抗吗。这就是
zmq.IMMEDIATE
可以通过调用.setsockopt()
-方法来管理的相关问题 更多 >
编程相关推荐