为什么这个zmq代码不起作用?

2024-10-04 09:17:53 发布

您现在位置:Python中文网/ 问答频道 /正文

这个测试不起作用。在

class PrintHandler(MessageHandler):
    def handle_message(self, message):
        print(message)

class FileHandler(MessageHandler):
    def handle_message(self, message):
        with open('nana', 'w') as f:
            f.write(message)


class SubscribeProcess(Process):


    def __init__(self, handler):
        super(SubscribeProcess, self).__init__(group=None, target=None, name=None, args=(), kwargs={})
        self.handler = handler


    def run(self):

        self.address = TcpAddress(host='127.0.0.1', port=5555)
        subscriber = ZmqSubscriber(ZmqBlockingConnection(address=self.address, bind=False))
        subscriber.set_message_handler(self.handler)
        print('............')


class TestZmqSubscriber(TestCase):
    def test_set_message_handler(self):
        address = TcpAddress(host='127.0.0.1', port=5555)
        pub_connection = ZmqBlockingConnection(address, bind=True)
        publisher = ZmqPublisher(pub_connection)
        p = SubscribeProcess(handler=PrintHandler())
        p.start()
        while True:
            publisher.publish('Message number {}'.format(2))

我现在说的不是单元测试。但我想先在控制台中查看收到的消息。然后我会写适当的测试。在

虽然这两个脚本工作得很好。在

^{pr2}$
address = TcpAddress(host='127.0.0.1', port=5555)
pub_connection = ZmqBlockingConnection(address, bind=True)
publisher = ZmqPublisher(pub_connection)
while True:
    publisher.publish('Message number {}'.format(2))

内部subscriber.set_消息处理程序(处理者)实际上是这个吗

def start_receiving_messages(self, message_handler):
    while True:
        message_handler.handle_message(self.socket.recv())

在调试器中,我看到代码无限地挂起接收插座()

也许我用错了多重处理?在

编辑1

class ZmqBlockingConnection(Connection):

def start_receiving_messages(self, message_handler):
    while True:
        message_handler.handle_message(self.socket.recv())

def send_message(self, message):
    self.socket.send(message)

def __init__(self, address, bind, hwm=1000):
    self.hwm = hwm
    self.bind = bind
    self.address = address
    self.socket = None

def set_hwm(self, hwm):
    self.socket.set_hwm(hwm)

def configure(self, socket_type):
    self.socket = zmq.Context().socket(socket_type)
    if self.bind:
        self.socket.bind(str(self.address))
    else:
        self.socket.connect(str(self.address))
    self.set_hwm(self.hwm)

Tags: selfnonetruemessagebindaddressdefsocket
1条回答
网友
1楼 · 发布于 2024-10-04 09:17:53

好吧,问题出在

def configure(self, socket_type):
self.socket = zmq.Context().instance().socket(socket_type)
if self.bind:
    self.socket.bind(str(self.address))
else:
    self.socket.connect(str(self.address))
self.set_hwm(self.hwm)

因此,我开始创建上下文实例,而不是使用singleton,现在它正在工作。在

相关问题 更多 >