如何使用芹菜在Django上设置定期任务?

2024-09-28 18:48:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我的当前代码:

from celery.task.schedules import crontab
from celery.decorators import task, periodic_task


@periodic_task(run_every=crontab(hour=15, minute=55, day_of_week="wed"))
def demo():
    print("testing------------------------")

设置文件:

CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost//')

没用了,我是不是漏掉了什么?你知道吗

提前感谢


Tags: run代码fromimportdecoratorsurltaskbroker
1条回答
网友
1楼 · 发布于 2024-09-28 18:48:00

我认为你应该使用下面的代码,它应该是你的工作。你知道吗

celery_app = celery("project_name")

@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
  sender.add_periodic_task(5.0, demo.s(args))

@celery_app.task(bind=True)
def demo(args):
   print("testing            ")

相关问题 更多 >