pyzmq订阅服务器如何检测脱机发布服务器?

2024-10-01 13:32:16 发布

您现在位置:Python中文网/ 问答频道 /正文

一个SUB-抄写员如何确保另一边有一个PUB-lisher,否则根本就不会启动?在

详细信息:
当我的SUB-scriber代码与正在运行的远程PUB-lisher一起工作时,当我.connect()+将我的客户机订阅到一个虚拟服务器(比如alocalhost)时,它不会注意到没有PUB-lisher正在运行,它只是启动并等待。在

我使用标准程序:

    sock = context.socket(zmq.SUB)
    sock.connect("tcp://{}:{}".format(host, port))
    topic_filter = 'blah'
    sock.setsockopt_string(zmq.SUBSCRIBE, topic_filter)
    # here should come something that warns about offline publisher...

Tags: 代码服务器客户机topic远程connect详细信息zmq
1条回答
网友
1楼 · 发布于 2024-10-01 13:32:16

唯一的酒吧/酒吧没有办法创造这种局面

ZeroMQ在概念上和实践上都是一个强大的工具箱。我们不应试图“弯曲”库原语-这些原语本身被理解为更复杂的消息传递和信令目的的构建块,而不是一个现实的解决方案-这样做的事情没有主要涵盖在最初的S可伸缩F正常C沟通P模式设计原型中。在

原型只是PUB/SUB只是PUB/SUB。在

PUB-广播给所有人,如果有的话,SUB,每个人,如果有的话,SUB监听已经到达的内容(如果有任何消息到达,则应用过滤器。。。是的,在SUB-侧加载网络上进行过滤)。在


不过,我们可以设计一个多原型的方法来解决这个问题

让我为最简单的方案画一个基本方案。在

假设您完全控制了双方(设计和实现)。在

# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
# [Side A]
#        |_____aKnockSOCK = context.socket( zmq.PAIR )
#        |     + .setsockopt( zmq.CONFLATE )
#        |     + .bind()
#        |
#        |_____anEmitSOCK = context.socket( zmq.PUB )
#              + .bind()
#
isNotReceivedSigEXIT  = False
while( isNotReceviedSigEXIT ):
       if ( 0 == aKnockSOCK.poll( aFastPollIN, zmq.POLLIN ) ):
           # nobody new knocking to setup ...
           #                         
           # do the main job,
           #    with countdown segmentation
           #    to escape to the outer loop
           #    so as to check for new SUB-s
           #    knocking as they come to the show
           #                         
       else:
           aKnockSOCK.send( "aKnockSOCK ACK: service is ready. Go .connect()" )
#
#                             
# ZeroMQ resources graceful termination 
#        socket closes + context dismantle & clean exit



# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
#[Side B]
#        |_____aKnockSOCK = context.socket( zmq.PAIR )
#        |     + .setsockopt( zmq.CONFLATE )
#        |
#        |_____aRecvrSOCK = context.socket( zmq.SUB )
#
#
isToBreakEXIT = False
for nthAttempt in range( 10 ):
    if ( isToBreakEXIT ):
         break
    try:
          aKnockSOCK.connect( ... )
    except:
          ReportConnectERROR( ... )
          sleep( ... )
          continue
    #                   once local aKnockSOCK got instantiated
    for kthPoll in range( 10 ):
        if ( 0 == aKnockSOCK.poll( aLongPollIN, zmq.POLLIN ) ):
           sleep( thisCouldBeAddedToLongPollIN )
        else:
           #                     - .recv() + dismantle
           aKnockSOCK.recv()
           aKnockSOCK.setsockopt( zmq.LINGER, 0 )
           aKnockSOCK.close()
           #                     - .connect() + use
           aRecvrSOCK.connect( ... )
           aRecvrSOCK.setsockopt( zmq.SUBSCRIBE, ... )
           #                     - [Side B] main job start
           #                     - [Side B] main job end
           isToBreakEXIT = True
           break
    pass
#                             
# ZeroMQ resources graceful termination 
#        socket closes + context dismantle & clean exit

相关问题 更多 >