我正在寻找一种方法,将kombu用作tornado sockjs和Django应用服务器之间的MQ适配器。我做了一些类似的事情:
class BrokerClient(ConsumerMixin):
clients = []
def __init__(self):
self.connection = BrokerConnection(settings.BROKER_URL)
self.io_loop = ioloop.IOLoop.instance()
self.queue = sockjs_queue
self._handle_loop()
@staticmethod
def instance():
if not hasattr(BrokerClient, '_instance'):
BrokerClient._instance = BrokerClient()
return BrokerClient._instance
def add_client(self, client):
self.clients.append(client)
def remove_client(self, client):
self.clients.remove(client)
def _handle_loop(self):
try:
if self.restart_limit.can_consume(1):
for _ in self.consume(limit=5):
pass
except self.connection.connection_errors:
print ('Connection to broker lost. '
'Trying to re-establish the connection...')
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)
def get_consumers(self, Consumer, channel):
return [Consumer([self.queue, ], callbacks=[self.process_task])]
def process_task(self, body, message):
for client in self.clients:
if hasattr(body, 'users') and client.user.pk in body.users:
client.send(body)
message.ack()
但是tornado在执行handle_循环时被阻塞(如预期)。在
有什么办法可以防止这种情况吗?在
我知道Pika库适配器Tornado,但我想使用kombu,因为它已经在项目中使用,并且具有灵活的传输方式。在
更新:
已将“句柄”回路更改为生成器函数
^{pr2}$
我也有类似的需求,以非阻塞的方式在Kombu/RabbitMQ和ZeroMQ之间切换。解决方案是使用Gevent对socket库进行monkey补丁,这样Kombu也将成为非阻塞的。我的“main”线程运行了Kombu drain_events回调,在另一个gevent线程中,我有一个从ZeroMQ套接字接收消息的循环。效果很好。在
这对于librabbitmq也不起作用,因为它在C中执行自己的套接字,而不受Gevent的影响。在
最后,我找到了RabbitMQ后端的正确解决方案:
对于其他后端,您可以使用问题中发布的解决方案
注意:不能与librabbitmq一起使用
相关问题 更多 >
编程相关推荐