芹菜停工工

2024-10-02 22:30:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是celery(solo pool with concurrency=1),我希望能够在特定任务运行后关闭worker。一个警告是,我想避免任何可能的工人接下任何进一步的任务后。在

以下是我在大纲中的尝试:

from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun

app = Celery()
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
    raise WorkerShutdown()

但是,当我管理工人的时候

^{pr2}$

执行任务

add.delay(1,4)

我得到以下信息:

 -------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f596896ce90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

任务重新排队,将在另一个工作线程上再次运行,从而导致循环。在

当我在任务本身中移动WorkerShutdown异常时,也会发生这种情况。在

@app.task
def add(x, y):
    print(x + y)
    raise WorkerShutdown()

有没有一种方法可以让我在完成某项任务后关闭工人,同时避免这种不幸的副作用?在


Tags: fromimportaddapptaskdefwithsolo
3条回答

关闭工作进程的建议过程是发送TERM信号。这将导致芹菜工人在完成任何当前正在运行的任务后关闭。如果向工作进程的主进程发送QUIT信号,工作进程将立即关闭。在

然而,celery文档通常从命令行或通过systemd/initd来讨论这一点,但是celery还通过celery.app.control提供了一个远程工作者控制API。
您可以revoke一个任务来阻止工作线程执行该任务。这应该可以防止您遇到的循环。此外,控件也以这种方式支持工作线程的shutdown。在

所以我想下面这些会让你得到你想要的行为。在

@app.task(bind=True)
def shutdown(self):
    app.control.revoke(self.id) # prevent this task from being executed again
    app.control.shutdown() # send shutdown signal to all workers

由于当前不可能从任务内确认任务,然后继续执行所述任务,使用revoke的方法可以避免这个问题,因此,即使任务再次排队,新的工作人员也会忽略它。在

或者,下面的操作也会阻止重新交付的任务再次执行。。。在

^{pr2}$

同样值得注意的是,^{}还有一个revoke方法,它为您调用self.app.control.revoke。在

如果您需要关闭某个特定的worker并且事先不知道它的名称,可以从任务属性中获取它。根据以上答案,您可以使用:

app.control.shutdown(destination=[self.request.hostname])

或者

^{pr2}$

注意:

  • 一个worker应该以一个名字开始(选项'-n'
  • 任务应该用bind=True参数定义。在

如果您关闭工作线程,则在任务完成后,它将不再重新排队。在

@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
    app.control.broadcast('shutdown')

这将在任务完成后正常关闭工作线程。在

^{pr2}$

注意:广播将关闭所有工人。如果要关闭特定的worker,请使用名称启动worker

celery -A celeryapp  worker -n self_killing  concurrency=1  pool=solo

现在可以用destination参数关闭它。在

app.control.broadcast('shutdown', destination=['celery@self_killing'])

相关问题 更多 >