如何将基于班级的任务传递到CELERY_BEAT_时间表中

2024-10-01 15:35:58 发布

您现在位置:Python中文网/ 问答频道 /正文

正如在docs类中看到的那样,基于类的任务是表示复杂逻辑的一种公平方式。在

但是,文档没有指定如何将新创建的基于类的任务添加到CELERY_BEAT_SCHEDULE(使用django)

我试过的事情: celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from payments.tasks.generic.payeer import PayeerPaymentChecker
    from payments.tasks.generic.ok_pay import OkPayPaymentChecker

    okpay_import = OkPayPaymentChecker()
    payeer_imprt = PayeerPaymentChecker()

    sender.add_periodic_task(60.0, okpay_import.s(),
                             name='OkPay import',
                             expires=30)

    sender.add_periodic_task(60.0, payeer_imprt.s(),
                             name='Payeer import',
                             expires=30)

--或者--

payments/task_summary.py

^{pr2}$

真的不知道该怎么办,恢复到: payments/task_summary.py/

from payments.tasks.generic.ok_pay import OkPayPaymentChecker
from payments.tasks.generic.payeer import PayeerPaymentChecker
from celery import shared_task


@shared_task
def run_payer():
    instance = PayeerPaymentChecker()
    return instance.run()


@shared_task
def run_okpay():
    instance = OkPayPaymentChecker()
    return instance.run()

我检查过但不能帮助我/解决问题的在线资源:


Tags: instancerunfrompyimportcomtasksummary
1条回答
网友
1楼 · 发布于 2024-10-01 15:35:58

我也花了一段时间才找到答案,因为这个问题在谷歌搜索结果中占据了很高的位置,我想我应该把它放在这里,让那些苦苦寻找答案的人知道:

您可以像普通任务一样添加它,但是使用类名。在

CELERY_BEAT_SCHEDULE = {
    'my_task_name': {
        'task': 'mymodule.tasks.MyTaskClass',
        'schedule': timedelta(seconds=60),
},

(假设您有mymodule/tasks.py与:

^{pr2}$

相关问题 更多 >

    热门问题