我正在尝试使用Alvaro Videla的reverse topic exchange rabbitmq plugin创建一个芹菜应用程序。工作人员似乎可以使用此交换与代理进行良好的连接,但当我主题反向路由我的任务时,不要选择“#”或“*”,其工作方式类似于直接交换。在
这就是我的队列:
Queue(name='cluster',
exchange = Exchange(name='cluster',
type='x-rtopic',
delivery_mode='persistent',
durable=True),
routing_key='intel.%d.%s' % (n_cores, hostname),
durable = True,)
现在想象两个工人使用下面的路由键
这是关于我正在尝试的任务和我所经历的任务的路由键:
^{pr2}$为了找出问题出在哪里,我测试了这个插件,使用pika和kombu进行简单的消息传递,两者都很好,完全符合预期。所以我想一定是Celery交换(路由)消息的方式有问题。也许我应该创建一个自定义路由类!?在
提前谢谢。在
经过一段时间的挖掘,我发现反向主题交换插件可以很好地处理芹菜。我误解了Rabbitmq队列的工作方式。为了使它完全工作,我必须定义一个我的路由器,在这里任务被路由到包含这些队列的交换机,并且只指定路由密钥和交换名称,这样任务仍然会循环连接到该交换机的节点,并且能够在任务路由密钥上使用通配符。在
所以队列设置如下:
路由队列='intel.8.pchost'
芹菜队列=(
路由器应该是这样的:
类MyRouter(对象):
^{pr2}$然后,我将把路由键作为任务的Kargs传递,可以在任务“intel.#”中设置,这意味着该任务将由任何队列以intel开头的worker执行。在
唯一的问题!这是由于某些原因,我不得不使用.apply\u async而不是.delay来执行任务。在
整个想法是能够根据集群中可用的机器规格来路由我的任务。有些任务只能在intel处理器上运行,而其他任务只能在amd上运行,或者根据节点中的内核数或使用主机名定义。在
希望这能帮助将来想做同样事情的人。在
相关问题 更多 >
编程相关推荐