2024-05-17 06:24:57 发布
网友
当我将任务路由到特定队列时,它会工作:
task.apply_async(queue='beetroot')
但如果我创造了一个链子:
chain = task | task
然后我写
chain.apply_async(queue='beetroot')
它似乎忽略了queue关键字并将其分配给默认的“芹菜”队列。
如果celery支持链式路由-所有任务都在同一队列中按顺序执行,那就太好了。
这已经很晚了,但我认为@mpaf提供的代码并不完全正确。
Context:在我的例子中,我有两个子任务,其中第一个子任务提供一个返回值,作为输入参数传递给第二个子任务。我在执行第二个任务时遇到了麻烦——我在日志中看到,芹菜会将第二个任务确认为第一个任务的回调,但它永远不会执行第二个任务。
这是我的非工作链代码-:
from celery import chain chain( module.task1.s(arg), module.task2.s() ).apply_async(countdown=0.1, queue='queuename')
使用@mpaf的答案中提供的语法,我可以执行这两个任务,但是执行顺序是随意的,第二个子任务没有被确认为第一个子任务的回调。我的想法是浏览文档,了解如何在子任务上显式设置队列。
这是工作代码-:
chain( module.task1.s(arg).set(queue='queuename'), module.task2.s().set(queue='queuename') ).apply_async(countdown=0.1)
好的,我弄明白了。
必须向子任务定义中添加所需的执行选项,如queue=或countdown=或通过部分:
子任务定义:
from celery import subtask chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')
部分:
chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')
然后执行链:
chain.apply_async()
或者
chain.delay()
任务将被发送到“beetroot”队列。最后一个命令中的额外执行参数将不起任何作用。在Chain(或Group,或任何其他Canvas原语)级别应用所有这些执行参数可能会比较好。
我是这样做的:
subtask = task.s(*myargs, **mykwargs).set(queue=myqueue) mychain = celery.chain(subtask, subtask2, ...) mychain.apply_async()
这已经很晚了,但我认为@mpaf提供的代码并不完全正确。
Context:在我的例子中,我有两个子任务,其中第一个子任务提供一个返回值,作为输入参数传递给第二个子任务。我在执行第二个任务时遇到了麻烦——我在日志中看到,芹菜会将第二个任务确认为第一个任务的回调,但它永远不会执行第二个任务。
这是我的非工作链代码-:
使用@mpaf的答案中提供的语法,我可以执行这两个任务,但是执行顺序是随意的,第二个子任务没有被确认为第一个子任务的回调。我的想法是浏览文档,了解如何在子任务上显式设置队列。
这是工作代码-:
好的,我弄明白了。
必须向子任务定义中添加所需的执行选项,如queue=或countdown=或通过部分:
子任务定义:
部分:
然后执行链:
或者
任务将被发送到“beetroot”队列。最后一个命令中的额外执行参数将不起任何作用。在Chain(或Group,或任何其他Canvas原语)级别应用所有这些执行参数可能会比较好。
我是这样做的:
相关问题 更多 >
编程相关推荐