我试图在Flask路由中启动一个异步任务,并使用JS轮询任务状态位置,在任务完成后检索结果。任务显示为“挂起”,从未成功或失败
当我运行celery -A app worker --loglevel=debug
时,我得到了这样的结果:error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
我已经做了很多搜索,但还没有找到原因。下面是一个简单的例子
...
from app.celery_tasks import *
# This route initiates a test
@main_bp.route('/test_celery')
def test_celery_route():
run_test_url = url_for('main_bp.run_test_task', _external=True)
response = requests.post(run_test_url)
test_task_url = response.headers.get('Location')
return render_template('main/celery_test_page.html', test_task_url=test_task_url)
# This endpoint receives the request and kicks of a celery task
@main_bp.route('/run_test_task', methods=['POST'])
def run_test_task():
task = test_task.apply_async()
return jsonify({}), 202, {'Location': url_for('main_bp.test_task_status',
task_id=task.id)}
# The task result lives here
@main_bp.route('/test_task_status/<task_id>')
def test_task_status(task_id):
task = test_task.AsyncResult(task_id)
response = {
'state': task.state
}
return jsonify(response)
{% extends 'base.html' %}
{% block app_content %}
<h1>Test Page</h1>
<div id="test-div">
</div>
{% endblock app_content %}
{% block scripts %}
{{ super() }}
<script>
var test_task_url = "{{ test_task_url }}";
console.log(test_task_url);
update_test_results(test_task_url);
function update_test_results(status_url) {
// send GET request to status URL
$.getJSON(status_url, function(data) {
console.log(data);
console.log('Status Update function triggered')
console.log(data['state'])
if (data['state'] == 'SUCCESS'){
console.log('Task succeeded')
$("#test-div").append('<p>Task <b>Succeeded</b></p>')
}
else if (data['state'] == 'FAILURE') {
console.log('Task failed')
$("#test-div").append('<p>Task <b>Failed</b></p>')
}
else if (data['state'] == 'PENDING') {
console.log('Task is pending')
$("#test-div").append('<p>Task is <b>Pending</b></p>')
}
else {
console.log('Other state found')
// rerun in 2 seconds
setTimeout(function() {
var status_url = "{{ test_task_url }}";
update_test_results(status_url);
}, 2000);
}
});
};
</script>
{% endblock scripts %}
我认为上面的代码是可以的,但我还是要发布它来增加上下文
app/celery_tasks.py
...
from . import make_celery
@celery.task(bind=True)
def test_task(self):
time.sleep(5)
return 1
app/init.py
...
from celery import Celery
def make_celery(app=None):
app = app or create_app()
celery = Celery(
app.import_name,
backend=app.config['CELERY_BACKEND_URL'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
celery.app = app
return celery
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
celery.conf.update(app.config)
...
config.py 请注意,它与应用程序目录位于同一级别,而不是在应用程序目录中
class Config(object):
...
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_BACKEND_URL = 'redis://localhost:6379/0'
因此,在这个设置中,当我运行test
时,我看到:
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Hub
[2021-05-15 07:43:00,384: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Pool
[2021-05-15 07:43:00,899: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,900: DEBUG/MainProcess] | Worker: Starting Consumer
[2021-05-15 07:43:00,901: DEBUG/MainProcess] | Consumer: Starting Connection
[2021-05-15 07:43:00,920: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-05-15 07:43:00,920: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,921: DEBUG/MainProcess] | Consumer: Starting Events
[2021-05-15 07:43:00,931: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,931: DEBUG/MainProcess] | Consumer: Starting Heart
[2021-05-15 07:43:00,934: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,934: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-05-15 07:43:00,934: INFO/MainProcess] mingle: searching for neighbors
[2021-05-15 07:43:01,961: INFO/MainProcess] mingle: all alone
[2021-05-15 07:43:01,961: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,961: DEBUG/MainProcess] | Consumer: Starting Tasks
[2021-05-15 07:43:01,965: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,965: DEBUG/MainProcess] | Consumer: Starting Control
[2021-05-15 07:43:01,968: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,968: DEBUG/MainProcess] | Consumer: Starting Gossip
[2021-05-15 07:43:01,971: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Consumer: Starting event loop
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2021-05-15 07:43:01,972: INFO/MainProcess] celery@macbook-pro.lan ready.
[2021-05-15 07:43:01,972: DEBUG/MainProcess] basic.qos: prefetch_count->32
然后,当我加载路由(/test\u芹菜)时,我得到以下错误:
[2021-05-15 07:43:06,717: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/Users/me/soul/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 555, in on_task_received
strategy = strategies[type_]
KeyError: 'app.celery_tasks.test_task'
我检查了此错误中引用的文档,但仍然无法解决此问题
在
app/__init__.py
中,您无条件地创建一个芹菜实例,在调用make_celery()
时创建第二个芹菜实例。这是你所看到的问题的偶然现象,但确实表明结构混乱这里,你看到的问题始于
以及导入
app
时会发生什么(或者更重要的是,不会发生什么)表示导入
app
不足以导致app.celery_tasks
被导入。如果您在那里导入蓝图,我怀疑它是从create_app()
内部间接导入的,但是当芹菜导入app
时create_app()
没有被调用,因此芹菜看不到您的任务考虑应用程序周围的包装器。让我们称之为
myapp.py
然后将
make_celery()
的大部分移动到create_app()
启动应用程序时,
FLASK_APP=myapp flask run
将使用myapp.app
,该实例是Flask实例,并且celery -A myapp.celery
将在运行create_app()
后拾取芹菜实例,该实例导入蓝图,导入app.celery_tasks
作为副作用这种方法的好处是webapp和芹菜工作者实例之间的设置是相同的,这避免了一系列难以诊断的问题
我有一个工作示例here,您可以从中借用
我通过将
celery -A app worker
更改为celery -A app.celery_tasks worker
解决了这个问题。更改后,我可以看到Cellery_tasks.py中的任务已注册相关问题 更多 >
编程相关推荐