回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我试图在Flask路由中启动一个异步任务,并使用JS轮询任务状态位置,在任务完成后检索结果。任务显示为“挂起”,从未成功或失败</p>
<p>当我运行<code>celery -A app worker --loglevel=debug</code>时,我得到了这样的结果:<code>error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.</code>我已经做了很多搜索,但还没有找到原因。下面是一个简单的例子</p>
<p>以下是基本流程:
<a href="https://i.stack.imgur.com/JTOlC.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/JTOlC.png" alt="enter image description here"/></a></p>
<ol>
<li>routes.py</li>
</ol>
<pre><code>...
from app.celery_tasks import *
# This route initiates a test
@main_bp.route('/test_celery')
def test_celery_route():
run_test_url = url_for('main_bp.run_test_task', _external=True)
response = requests.post(run_test_url)
test_task_url = response.headers.get('Location')
return render_template('main/celery_test_page.html', test_task_url=test_task_url)
</code></pre>
<ol start=“2”>
<li>测试url端点和任务状态</li>
</ol>
<pre><code>
# This endpoint receives the request and kicks of a celery task
@main_bp.route('/run_test_task', methods=['POST'])
def run_test_task():
task = test_task.apply_async()
return jsonify({}), 202, {'Location': url_for('main_bp.test_task_status',
task_id=task.id)}
# The task result lives here
@main_bp.route('/test_task_status/<task_id>')
def test_task_status(task_id):
task = test_task.AsyncResult(task_id)
response = {
'state': task.state
}
return jsonify(response)
</code></pre>
<ol start=“3”>
<li>test_芹菜.html:html+JS</li>
</ol>
<pre><code>{% extends 'base.html' %}
{% block app_content %}
<h1>Test Page</h1>
<div id="test-div">
</div>
{% endblock app_content %}
{% block scripts %}
{{ super() }}
<script>
var test_task_url = "{{ test_task_url }}";
console.log(test_task_url);
update_test_results(test_task_url);
function update_test_results(status_url) {
// send GET request to status URL
$.getJSON(status_url, function(data) {
console.log(data);
console.log('Status Update function triggered')
console.log(data['state'])
if (data['state'] == 'SUCCESS'){
console.log('Task succeeded')
$("#test-div").append('<p>Task <b>Succeeded</b></p>')
}
else if (data['state'] == 'FAILURE') {
console.log('Task failed')
$("#test-div").append('<p>Task <b>Failed</b></p>')
}
else if (data['state'] == 'PENDING') {
console.log('Task is pending')
$("#test-div").append('<p>Task is <b>Pending</b></p>')
}
else {
console.log('Other state found')
// rerun in 2 seconds
setTimeout(function() {
var status_url = "{{ test_task_url }}";
update_test_results(status_url);
}, 2000);
}
});
};
</script>
{% endblock scripts %}
</code></pre>
<p>我认为上面的代码是可以的,但我还是要发布它来增加上下文</p>
<p>app/celery_tasks.py</p>
<pre><code>...
from . import make_celery
@celery.task(bind=True)
def test_task(self):
time.sleep(5)
return 1
</code></pre>
<p>app/<strong>init</strong>.py</p>
<pre><code>...
from celery import Celery
def make_celery(app=None):
app = app or create_app()
celery = Celery(
app.import_name,
backend=app.config['CELERY_BACKEND_URL'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
celery.app = app
return celery
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
celery.conf.update(app.config)
...
</code></pre>
<p>config.py
请注意,它与应用程序目录位于同一级别,而不是在应用程序目录中</p>
<pre><code>class Config(object):
...
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_BACKEND_URL = 'redis://localhost:6379/0'
</code></pre>
<p>因此,在这个设置中,当我运行<code>test</code>时,我看到:</p>
<pre><code>[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Hub
[2021-05-15 07:43:00,384: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Pool
[2021-05-15 07:43:00,899: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,900: DEBUG/MainProcess] | Worker: Starting Consumer
[2021-05-15 07:43:00,901: DEBUG/MainProcess] | Consumer: Starting Connection
[2021-05-15 07:43:00,920: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-05-15 07:43:00,920: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,921: DEBUG/MainProcess] | Consumer: Starting Events
[2021-05-15 07:43:00,931: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,931: DEBUG/MainProcess] | Consumer: Starting Heart
[2021-05-15 07:43:00,934: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,934: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-05-15 07:43:00,934: INFO/MainProcess] mingle: searching for neighbors
[2021-05-15 07:43:01,961: INFO/MainProcess] mingle: all alone
[2021-05-15 07:43:01,961: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,961: DEBUG/MainProcess] | Consumer: Starting Tasks
[2021-05-15 07:43:01,965: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,965: DEBUG/MainProcess] | Consumer: Starting Control
[2021-05-15 07:43:01,968: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,968: DEBUG/MainProcess] | Consumer: Starting Gossip
[2021-05-15 07:43:01,971: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Consumer: Starting event loop
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2021-05-15 07:43:01,972: INFO/MainProcess] celery@macbook-pro.lan ready.
[2021-05-15 07:43:01,972: DEBUG/MainProcess] basic.qos: prefetch_count->32
</code></pre>
<p>然后,当我加载路由(/test\u芹菜)时,我得到以下错误:</p>
<pre><code>[2021-05-15 07:43:06,717: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/Users/me/soul/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 555, in on_task_received
strategy = strategies[type_]
KeyError: 'app.celery_tasks.test_task'
</code></pre>
<p>我检查了此错误中引用的文档,但仍然无法解决此问题</p>