from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish
# when using celery versions older than 4.0, use body instead of headers
@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
# the task may not exist if sent using `send_task` which
# sends tasks by name, so fall back to the default result backend
# if that is the case.
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend
backend.store_result(headers['id'], None, "SENT")
>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
...
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
...
现在我使用以下方案:
芹菜在任务发送时不写状态,这在一定程度上是一种优化 (见http://docs.celeryproject.org/en/latest/userguide/tasks.html#state)。
如果您真的需要,可以简单地添加:
然后,您可以测试挂起状态,以检测任务没有(似乎没有) 已发送:
AsyncResult.state在任务ID未知的情况下返回挂起。
http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending
如果需要区分未知ID和现有ID,可以提供自定义任务ID:
相关问题 更多 >
编程相关推荐