如何在多个celeryd进程中跟踪撤销的任务

2024-09-30 01:20:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个提醒类型的应用程序,它使用“eta”参数来调度芹菜中的任务。如果提醒对象中的参数发生了变化(例如提醒时间),那么我将撤消先前发送的任务并对新任务进行排队。在

我想知道有没有什么好方法可以在celeryd重启时跟踪被撤销的任务。我希望能够动态地放大/缩小celeryd进程,而且似乎在发送revoke命令之后启动的任何celeryd进程仍将执行该任务。在

一种方法是保留一个已撤销任务ID的列表,但是这种方法会导致列表任意增长。修剪这个列表需要保证任务不再在RabbitMQ队列中,这似乎是不可能的。在

我还尝试为每个celeryd worker使用共享的--statedb文件,但似乎statedb文件只在worker终止时更新,因此不适合我想要完成的任务。在

提前谢谢!在


Tags: 文件对象方法应用程序类型列表参数进程
3条回答

有趣的问题,我认为使用广播命令应该很容易解决。 如果新的工作线程启动时,它会请求所有其他工作线程转储其已吊销的 新员工的任务。增加了两个新的遥控命令, 您可以使用@Panel.register轻松添加新命令

模块控件.py公司名称:

from celery.worker import state
from celery.worker.control import Panel

@Panel.register
def bulk_revoke(panel, ids):
    state.revoked.update(ids)

@Panel.register
def broadcast_revokes(panel, destination):
    panel.app.control.broadcast("bulk_revoke", arguments={
         "ids": list(state.revoked)},
         destination=destination)

添加到芹菜进口:

^{pr2}$

现在唯一缺少的问题是连接它以便新工人 启动时触发broadcast_revokes。我想你可以用worker_ready 此信号:

from celery import current_app as celery
from celery.signals import worker_ready

def request_revokes_at_startup(sender=None, **kwargs):
    celery.control.broadcast("broadcast_revokes",
                             destination=sender.hostname)

我不得不在我的项目中做一些类似的事情,并使用celerycam和{a1}。监视器获取任务的快照并定期将其保存在数据库中。并且有一个很好的用户界面来浏览和检查所有任务的状态。你甚至可以使用它even if your project is not Django based。在

不久前我实现了类似的东西,我提出的解决方案与你的非常相似。在

我解决这个问题的方法是让工作人员在作业运行时从数据库中获取Task对象(按照文档的建议,通过传递主键)。在您的情况下,在发送提醒之前,工作人员应该执行检查,以确保任务“准备好”可以运行。如果没有,它应该简单地返回而不做任何工作(假设ETA已经改变,并且另一个工人将接受新的工作)。在

相关问题 更多 >

    热门问题