Python中文
首页
教程
问答
标签
搜索
登录
注册
注册 芹菜 类型的任务
回答此问题可获得
20
贡献值,回答如果被采纳可获得
50
分。
<p>Python3.x,芹菜4.x。。。</p> <p>我有一个基于班级的任务。</p> <p><code>myproj/celery.py</code></p> <pre><code>from celery import Celery # django settings stuff... app = Celery('myproj') app.autodiscover_tasks() </code></pre> <p><code>app1/tasks.py</code></p> <pre><code>import celery class EmailTask(celery.Task): def run(self, *args, **kwargs): self.do_something() </code></pre> <p>如果我这样做了:</p> <pre><code>$ celery worker -A myproj -l info [tasks] . app2.tasks.debug_task . app2.tasks.test </code></pre> <p>所以,芹菜装饰器工作来注册任务,但是基于类的任务没有注册。</p> <h2>如何注册基于类的任务?</h2> <p><strong>更新1:</strong></p> <p>如果我将以下行添加到<code>app1/tasks.py</code></p> <pre><code>from myproj.celery import app email_task = app.tasks[EmailTask.name] </code></pre> <p>是的。</p> <pre><code>$ celery worker -A myproj -l info File "myproj/app1/tasks.py", line 405, in <module> email_task = app.tasks[EmailTask.name] File "/usr/local/lib/python3.5/site-packages/celery/app/registry.py", line 19, in __missing__ raise self.NotRegistered(key) celery.exceptions.NotRegistered </code></pre> <p><strong>更新2:</strong></p> <p>我可以通过包装器同步执行任务(<code>run</code>)。但是,我不能运行任务async,即通过<code>delay</code>。</p> <p><code>app1/tasks.py</code></p> <pre><code>@app.task def email_task(): """ Wrapper to call class based task """ task = EmailTask() # task.delay() # Won't work!!! task.run() </code></pre> <p>是的。</p> <pre><code>$./manage.py shell > from app1.tasks import EmailTask > task1 = EmailTask() > task1.run() # a-okay > task2 = EmailTask() > task2.delay() # nope <AsyncResult: 1c03bad9-169a-4a4e-a56f-7d83892e8bbc> # And on the worker... [2017-01-22 08:07:28,120: INFO/PoolWorker-1] Task app1.tasks.email_task[41e5bc7d-058a-400e-9f73-c853c0f60a2a] succeeded in 0.0701281649817247s: None [2017-01-22 08:10:31,909: ERROR/MainProcess] Received unregistered task of type None. The message has been ignored and discarded. </code></pre>
0 条评论
分类:
Python问答
请先
登录
后评论
默认排序
时间排序
1 个回答
匿名
1天前
擅长:python、mysql、java
<p>你可以找到完整的描述<a href="https://github.com/celery/celery/issues/3744" rel="noreferrer">here</a>,但对我来说,这已经足够添加</p> <pre><code>from myapp.celery import app app.tasks.register(MyTaskTask()) </code></pre>
请先
登录
后评论
针对此问题:
更多的回答
关注
89
关注
收藏
1
收藏,
216
浏览
网友 提问于 2天前
相关Python问题
想用靓汤抢夺价值
9 回答
想申请一份符合工作描述的简历吗
2 回答
想画网格,Python
7 回答
想白痴化我的Python战舰
7 回答
想看两列日期,但只上
3 回答
想看看我写的这个脚本读一个Fortran二进制fi吗
2 回答
想知道Django是如何实现ORM查询优化的吗
9 回答
想知道GeoDjango和地图服务吗
6 回答
想知道Image.resize操作在PIL文件中的详细工作方式吗
8 回答
想知道matplotlib pyplot为什么不调整边距
5 回答
想知道matplotlib颜色的颜色代码吗
9 回答
想知道pd.factorize,pd.get_dummies,sklearn.preprocessing.labeencoder和OneHotEncod之间的区别
4 回答
想知道property()在python中的实际用法吗
6 回答
想知道pyodidejs是如何工作的吗?
4 回答
想知道pyparsing==2.0.1的已完成处理依赖关系
1 回答
想知道ScikitLearn中的编码算法吗
2 回答
想知道VTK 5.04和VTK 5.4.2的vtkMassProperties差异吗
4 回答
想知道一个特定字符在一个特定句子中出现的次数吗
1 回答
想知道两个不同子集的重叠中有多少个对象吗
3 回答
想知道为什么is_素数函数的结果不正确吗
6 回答