pyzmq发布服务器可以从类实例中操作吗?

2024-06-01 09:13:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我为一个发布者编写了以下代码,它实例化了几个类实例并发布了一些消息。在

但是,我在用户端没有收到任何东西。在

出版商

import zmq
import time
from multiprocessing import Process

class SendData:
    def __init__(self, msg, port):
        self.msg = msg
        self.port = port
        ctx = zmq.Context()
        self.sock = ctx.socket(zmq.PUB)
        self.sock.bind('tcp://127.0.0.1:'+str(self.port))
        time.sleep(1)

    def sender(self):
        self.sock.send_json(self.msg)

def main():
    for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
        msg = {device:'Some random message'}
        instance = SendData(device, port)
        Process(target=instance.sender).start()

if __name__ == "__main__":
    main()

订户

^{pr2}$

在出版商出版任何东西之前,我就有了订阅者。另外,我可以看到TCP连接是建立的,所以连接是建立的。在

  • pyzmq版本16.0.0
  • python 2.7版

Q1:类实例是否支持0mq发布服务器?
问题2:我遗漏了什么?在


Tags: 实例importselftimemainportdevicedef
2条回答

A1:是的,是的

A2:使用范围冲突v/s零共享,ZeroMQ准则之一

一旦您的原始发布者代码在main()中执行,类实例化过程将通过.__init__()构造函数方法创建(即在main()-process使用范围内),它自己的Context()-实例属于此实例(包括它的所有派生子对象(sockets等))main()-处理。在

接下来,对Process(...)的调用将启动另外几个进程,这些进程从main()-作用域接收类实例(问题是这些实例已经创建了ZeroMQ不可共享的玩具)。在

解决方案?在

一种可能的肮脏的快速破解方法可能是延迟ZeroMQContext()是的,只需将它从.__init__()移动到某个.aDeferredSETUP()中的某个.aDeferredSETUP(),与main()-进程不同,你应该这样做,因为零共享是安全的。在

class SendData:
    def __init__(self, msg, port):
        self.msg      = msg
        self.port     = port

        self.NotSETUP = True
        self.ctx      = None
        self.sock     = None
        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ L8R
        # ctx         = zmq.Context()
        # self.sock   = ctx.socket( zmq.PUB )
        # self.sock.bind( 'tcp://127.0.0.1:' + str( self.port ) )
        # time.sleep( 1 )
        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ L8R

    def sender( self ):
        if      self.NotSETUP:
                self.aDeferredSETUP()

        self.sock.send_json( self.msg )

    def aDeferredSETUP( self ):     # create I/O-threads in Process(), not main()
        self.ctx    = zmq.Context() 
        self.sock   = self.ctx.socket( zmq.PUB )
        self.sock.bind( 'tcp://127.0.0.1:' + str( self.port ) )
        time.sleep( 1 )

如前所述,尝试在进程之间共享ZeroMQ上下文是这里的问题,user3666197的解决方案将起作用。 但是,在这种情况下,我建议将multiprocessing.Process子类化。这样,代码的哪个部分在哪个进程中执行就更清楚了。它还使您的代码更具可读性和可重用性。在

下面的代码为每个设备创建一个发送方进程。在程序运行期间,可以重用发送方进程以发送更多数据:

import multiprocessing
import queue
import zmq
import time

class Sender(multiprocessing.Process):

    def __init__(self, port):
        super(Sender, self).__init__()
        self._port = port
        self._messages = multiprocessing.Queue()
        self._do_stop = multiprocesing.Event()

    def run(self):
        """
        This code is executed in a new process.
        """
        ctx = zmq.Context()
        sock = ctx.socket(zmq.PUB)
        sock.bind("tcp://127.0.0.1:" + str(self._port))
        while not self._do_stop.is_set():
            try:
                msg = self._message.get_nowait()
                sock.send_json(msg)
            except queue.Empty:
                time.sleep(0.01)

    def stop(self):
        self._do_stop.set()

    def send_message(self, msg):
        self._messages.put(msg)

def main():
    data = zip(['2.2.2.2', '5.5.5.5'], [5001, 5002])
    # create senders
    senders = {device: Sender(port) for device, port in data}
    # start senders
    for device in senders:
        senders[device].start()
    # send messages
    for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
        msg = {device: 'Some random message'}
        senders[device].send_message(msg)
    # do more stuff here....
    # ... e.g. send more messages
    # ...
    # once you are finished, stop the subprocesses
    for device in senders:
        senders[device].stop()

我希望这有助于解决你的问题。在

相关问题 更多 >