芹菜在导入库后找不到定期任务

2024-06-25 22:48:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试用芹菜创建周期性任务,芹菜每天从网上下载一些文件。但是当我尝试在创建任务的文件中导入库时遇到了一个问题。我得到一个错误Received unregistered task of type 'download_data_nist.tasks.download_data'。如果我删除导入, 任务执行时没有错误。你知道吗

我已经把芹菜放进冰箱了设置.py地址:

from celery.schedules import crontab

CELERY_BROKER_URL = 'amqp://localhost'
CELERY_TIMEZONE = 'CET'
CELERY_BEAT_SCHEDULE = {
    'task-number-one': {
        'task': 'download_data_nist.tasks.download_data',
        'schedule': crontab(minute='*/1'),
    },
}

我创造了芹菜.py在我的应用的根文件夹中:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'AplikacijaZaPregledRanljivosti.settings')

app = Celery('AplikacijaZaPregledRanljivosti')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

我将此代码添加到根文件夹中的init.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

当我出错时任务.py看起来像这样:

from __future__ import absolute_import, unicode_literals

from celery import task
import json
import re
import requests
import zipfile
from django.conf import settings

@task()
def download_data():
    return "Data downloaded"

任务执行时任务.py看起来像这样:

from __future__ import absolute_import, unicode_literals

from celery import task

@task()
def download_data():
    return "Data downloaded"

如何导入库而不出错?你知道吗


Tags: thefrompyimportapptaskdatadownload
1条回答
网友
1楼 · 发布于 2024-06-25 22:48:33

当您从芹菜工人那里得到unregistered task错误时,您需要确定两件事:

  1. settings.py中通知芹菜你的应用程序模块。这还将帮助您了解可能遇到的依赖性错误:
# settings.py
CELERY_IMPORTS = (
    'your_app_name.tasks'
)
  1. 确保您满足了虚拟环境中的所有依赖项库等。在这种情况下,您需要确保已安装requests

相关问题 更多 >