<p>为此使用<code>virtualenv</code>会很方便。在</p>
<p>首先,就像@Gal说的,你需要确保你有<code>celery 4.x</code>。在</p>
<p>您可以通过<code>pip</code>安装此程序:</p>
<blockquote>
<p>pip install celery</p>
</blockquote>
<p>当然,您也可以安装<code>4.x</code>版本,将其添加到<code>requirements.txt</code>中,如下所示:</p>
<blockquote>
<p>celery==4.1.0</p>
</blockquote>
<p>或更高版本(如果将来可用)。在</p>
<p>然后您可以使用以下方法重新安装所有软件包:</p>
<ul>
<li><code>pip install -r requirements.txt</code></li>
</ul>
<p>这将确保你安装了特定的芹菜包。在</p>
<p>现在的芹菜部分,虽然你的代码可能没有错,但我将以一种方式写我如何让我的芹菜应用程序工作。在</p>
<p><b>初始化页:</b></p>
<pre><code>from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app
__all__ = ['celery_app']
</code></pre>
<p>芹菜_配置文件:</b></p>
^{pr2}$
<p><b>任务.py:</b></p>
<pre><code># -*- coding: utf-8 -*-
from __future__ import unicode_literals
import requests
from django.conf import settings
from django.http import HttpResponse
from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()
@python_2_unicode_compatible
class Update(Task):
name = 'tasks.update'
def run(self, *args, **kwargs):
# Run the task you want to do.
""" For me the regular TaskRegistry didn't work to register classes,
so I found this handy TaskRegistry demo and made use of it to
register tasks as classes."""
class TaskRegistry(Task):
def NotRegistered_str(self):
self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))
def assertRegisterUnregisterCls(self, r, task):
with self.assertRaises(r.NotRegistered):
r.unregister(task)
r.register(task)
self.assertIn(task.name, r)
def assertRegisterUnregisterFunc(self, r, task, task_name):
with self.assertRaises(r.NotRegistered):
r.unregister(task_name)
r.register(task, task_name)
self.assertIn(task_name, r)
def task_registry(self):
r = TaskRegistry()
self.assertIsInstance(r, dict, 'TaskRegistry is mapping')
self.assertRegisterUnregisterCls(r, Update)
r.register(Update)
r.unregister(Update.name)
self.assertNotIn(Update, r)
r.register(Update)
tasks = dict(r)
self.assertIsInstance(
tasks.get(Update.name), Update)
self.assertIsInstance(
r[Update.name], Update)
r.unregister(Update)
self.assertNotIn(Update.name, r)
self.assertTrue(Update().run())
def compat(self):
r = TaskRegistry()
r.regular()
r.periodic()
</code></pre>
<p>正如我在代码中所解释的那样,常规的<code>taskregistry</code>不能在Celery 4.x中使用,所以我使用了演示任务注册表。
当然,你也可以不使用类来完成任务,但我更喜欢使用类。在</p>
<p><b>设置.py:</b></p>
<pre><code># Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'
# Celery routes
CELERY_IMPORTS = (
'PATH.TO.tasks' # The path to your tasks.py
)
CELERY_DATABASE_URL = {
'default': '<CELERY_DATABASE>', # You can also use your already being used database here
}
INSTALLED_APPS = [
...
'PATH.TO.TASKS' # But exclude the tasks.py from this path
]
LOGGING = {
...
'loggers': {
'celery': {
'level': 'DEBUG',
'handlers': ['console'],
'propagate': True,
},
}
}
</code></pre>
<p>我用以下命令启动我的工人:</p>
<blockquote>
<p>redis-server --daemonize yes</p>
<p>celery multi start worker -A PATH.TO.TASKS -l info --beat # But exclude tasks.py from the path</p>
</blockquote>
<p>我希望这些信息可以帮助你或任何正在与芹菜作斗争的人。在</p>
<p>编辑:</p>
<p>请注意,我将worker作为守护进程启动,因此您实际上无法在控制台中看到日志。
对我来说,它记录在<code>.txt</code>文件中。在</p>
<p>另外,请注意要使用的路径,例如,您需要包含应用程序的路径,如下所示:</p>
<blockquote>
<p>project.apps.app</p>
</blockquote>
<p>对于其他情况,您需要包括任务.py没有<code>.py</code>,我写下了何时排除该文件和何时不排除。在</p>
<p>编辑2:</p>
<blockquote>
<p>The @shared_task decorator returns a proxy that always uses the task instance in the current_app.
This makes the @shared_task decorator useful for libraries and reusable apps, since they will not have access to the app of the user.</p>
</blockquote>
<p>请注意,<code>@shared_task</code>无权访问用户的应用程序。
你当前尝试注册的应用没有访问你的应用的权限。
您实际想要用于注册任务的方法是:</p>
<pre><code>from celery import Celery
app = Celery()
@app.task
def test_celery(x, y):
logger = logging.getLogger("AENIMA")
print("EUREKA")
logger.debug("EUREKA")
</code></pre>