Rabbitmq使用tornad非阻塞消息

2024-09-28 17:23:16 发布

您现在位置:Python中文网/ 问答频道 /正文

class WSHandler(tornado.websocket.WebSocketHandler):
    clients = []
    def open(self, name):
        # WSHandler.clients.append(self)
        # liveWebSockets.add(self)
        self.id = name
        self.clients.append(self)
        # self.application.pc.add_event_listener(self)
        print 'new connection'



    def on_message(self, message):
        print 'message received:  %s' % message
        # Reverse Message and send it back
        print 'sending back message: %s' % message[::-1]


        # pika sending message
        import pika
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                       'localhost'))
        channel = connection.channel()
        # clients.append(self)
        channel.queue_declare(queue='hello')
        # print dir(self)
        message_rabbit_mq = {
                                'web_socket': self.id,
                                'message': message
                            }
        message_rabbit_mq = json.dumps(message_rabbit_mq)                    
        channel.basic_publish(exchange='',
                              routing_key='hello',
                              body=message_rabbit_mq)
        connection.close()


        self.rabbit_connect()
        # def rabbit_connect():
        # pika receving message
        connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
        channel = connection.channel()

        channel.queue_declare(queue='hello')

        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            self.write_message(body)
            time.sleep(4)
            body_obj =  json.loads(body)
            if 'message' in body:
                if body_obj['message'] == "crack":
                    channel.stop_consuming()

        channel.basic_consume(callback,
                        queue='hello',
                        no_ack=True)

        channel.start_consuming()

        self.write_message("closed reference")

上面代码中的问题是

^{pr2}$

上述部分将阻塞on_message函数中的其余逻辑。如何使上述部分与其余逻辑异步运行? 这使得来自客户端的更多websocket消息无法通过处理。在


Tags: selfmessagehelloqueuedefchannelbodyconnection
1条回答
网友
1楼 · 发布于 2024-09-28 17:23:16

请尝试使用以下代码:

https://github.com/Gsantomaggio/rabbitmqexample/tree/master/webSocketPython

def threaded_rmq(): channel.queue_declare(queue="my_queue") logging.info('consumer ready, on my_queue') channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) channel.start_consuming()

以及 if __name__ == "__main__": logging.info('Starting thread RabbitMQ') threadRMQ = Thread(target=threaded_rmq) threadRMQ.start()

相关问题 更多 >