Django redis芹菜和芹菜节拍的正确设置

2024-10-03 09:08:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直试图设置django+celeri+redis+celeri_-beats,但这给我带来了麻烦。文档非常简单,但是当我运行django服务器、redis、celeri和celery beats时,不会打印或记录任何内容(我所有的测试任务都会记录一些日志)。在

这是我的文件夹结构:

- aenima 
 - aenima
   - __init__.py
   - celery.py

 - criptoball
   - tasks.py

在芹菜.py看起来像这样:

^{pr2}$

以及任务.py看起来像这样:

from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from celery import shared_task
import logging

from django_celery_beat.models import PeriodicTask, IntervalSchedule

cada_10_seg = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)

test_celery_periodic = PeriodicTask.objects.create(interval=cada_10_seg, name='test_celery', task='criptoball.tasks.test_celery',
expires=datetime.utcnow()+timedelta(seconds=30))

@shared_task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

这是django_celery_beat文档

不知道我错过了什么。当我跑的时候

celery -A aenima beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

celery -A aenima worker -l debug

redis-cli ping PONG

django runserver和redis服务器,我什么也没打印出来。在

在设置.py在

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IMPORTS = ('criptoball.tasks',)

到目前为止,在这个话题上还没有找到答案。在

我想解决这一切,这个错误可能只是许多错误中的一个。非常感谢你的帮助!在

编辑:

增加redis的设置,以不同的方式声明任务,提高调试级别。现在的错误是:

Received unregistered task of type u'tasks.test_celery'. The message has been ignored and discarded.

Did you remember to import the module containing this task? Or maybe you're using relative imports? KeyError: u'aenima.criptoball.tasks.test_celery'

我相信西芹的文件很差。在

编辑2 在尝试了所有的事情之后,当我把所有的任务都放在同一个任务里时,它起作用了芹菜.py文件。@shared_任务无效,必须使用@应用程序任务. 在


Tags: djangofrompytestimportredistaskdatetime
3条回答

我以前也有过这些问题。这不是你的密码。这通常是环境问题。 您应该运行virtualenv下的所有内容,添加一个带有特定包版本的requirements.txt文件。在

有一个关于celery 4.xdjango 1.x的已知问题,因此您应该考虑您正在使用的包。在

This教程将详细解释如何用芹菜构建virtualenv。在

如果你能告诉我你的软件包版本,我可能会尝试以不同的方式提供帮助。在

编辑:

我觉得这是你经营芹菜的方式。如果我们解决了第一个问题,请尝试使用以下方法:

celery -A aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

或者

^{pr2}$

您得到的最新错误与您的模块发现有关。 先试试吧。在

Received unregistered task of type u'tasks.test_celery'. The message has been ignored and discarded.

Did you remember to import the module containing this task? Or maybe you're using relative imports?

可能您的任务路径不正确,应该是:

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'criptoball.tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, 
}

tasks.test_celery应该是完整路径:criptoball.tasks.test_celery

为此使用virtualenv会很方便。在

首先,就像@Gal说的,你需要确保你有celery 4.x。在

您可以通过pip安装此程序:

pip install celery

当然,您也可以安装4.x版本,将其添加到requirements.txt中,如下所示:

celery==4.1.0

或更高版本(如果将来可用)。在

然后您可以使用以下方法重新安装所有软件包:

  • pip install -r requirements.txt

这将确保你安装了特定的芹菜包。在

现在的芹菜部分,虽然你的代码可能没有错,但我将以一种方式写我如何让我的芹菜应用程序工作。在

初始化页:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app

__all__ = ['celery_app']

芹菜_配置文件:

^{pr2}$

任务.py:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import requests

from django.conf import settings
from django.http import HttpResponse

from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()


@python_2_unicode_compatible
class Update(Task):
    name = 'tasks.update'

    def run(self, *args, **kwargs):
        # Run the task you want to do.

""" For me the regular TaskRegistry didn't work to register classes, 
so I found this handy TaskRegistry demo and made use of it to 
register tasks as classes."""
class TaskRegistry(Task):

    def NotRegistered_str(self):
        self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))

    def assertRegisterUnregisterCls(self, r, task):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task)
        r.register(task)
        self.assertIn(task.name, r)

    def assertRegisterUnregisterFunc(self, r, task, task_name):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task_name)
        r.register(task, task_name)
        self.assertIn(task_name, r)

    def task_registry(self):
        r = TaskRegistry()
        self.assertIsInstance(r, dict, 'TaskRegistry is mapping')

        self.assertRegisterUnregisterCls(r, Update)

        r.register(Update)
        r.unregister(Update.name)
        self.assertNotIn(Update, r)
        r.register(Update)

        tasks = dict(r)
        self.assertIsInstance(
            tasks.get(Update.name), Update)

        self.assertIsInstance(
            r[Update.name], Update)

        r.unregister(Update)
        self.assertNotIn(Update.name, r)

        self.assertTrue(Update().run())

    def compat(self):
        r = TaskRegistry()
        r.regular()
        r.periodic()

正如我在代码中所解释的那样,常规的taskregistry不能在Celery 4.x中使用,所以我使用了演示任务注册表。 当然,你也可以不使用类来完成任务,但我更喜欢使用类。在

设置.py:

# Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'

# Celery routes
CELERY_IMPORTS = (
    'PATH.TO.tasks' # The path to your tasks.py
)

CELERY_DATABASE_URL = {
    'default': '<CELERY_DATABASE>', # You can also use your already being used database here
}

INSTALLED_APPS = [
    ...
    'PATH.TO.TASKS' # But exclude the tasks.py from this path
]

LOGGING = {
    ...
    'loggers': {
        'celery': {
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True,
        },
    }
}

我用以下命令启动我的工人:

redis-server --daemonize yes

celery multi start worker -A PATH.TO.TASKS -l info --beat # But exclude tasks.py from the path

我希望这些信息可以帮助你或任何正在与芹菜作斗争的人。在

编辑:

请注意,我将worker作为守护进程启动,因此您实际上无法在控制台中看到日志。 对我来说,它记录在.txt文件中。在

另外,请注意要使用的路径,例如,您需要包含应用程序的路径,如下所示:

project.apps.app

对于其他情况,您需要包括任务.py没有.py,我写下了何时排除该文件和何时不排除。在

编辑2:

The @shared_task decorator returns a proxy that always uses the task instance in the current_app. This makes the @shared_task decorator useful for libraries and reusable apps, since they will not have access to the app of the user.

请注意,@shared_task无权访问用户的应用程序。 你当前尝试注册的应用没有访问你的应用的权限。 您实际想要用于注册任务的方法是:

from celery import Celery
app = Celery()

@app.task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

相关问题 更多 >