基本芹菜任务未运行(错误/MainProcess]接收到类型为“app.Celery\u tasks.test\u task”的未注册任务)

2024-10-17 06:18:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在Flask路由中启动一个异步任务,并使用JS轮询任务状态位置,在任务完成后检索结果。任务显示为“挂起”,从未成功或失败

当我运行celery -A app worker --loglevel=debug时,我得到了这样的结果:error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.我已经做了很多搜索,但还没有找到原因。下面是一个简单的例子

以下是基本流程: enter image description here

  1. routes.py
...
from app.celery_tasks import *

# This route initiates a test
@main_bp.route('/test_celery')
def test_celery_route():
    run_test_url = url_for('main_bp.run_test_task', _external=True)
    response = requests.post(run_test_url)
    test_task_url = response.headers.get('Location')

    return render_template('main/celery_test_page.html', test_task_url=test_task_url)
  1. 测试url端点和任务状态

# This endpoint receives the request and kicks of a celery task
@main_bp.route('/run_test_task', methods=['POST'])
def run_test_task():

    task = test_task.apply_async()

    return jsonify({}), 202, {'Location': url_for('main_bp.test_task_status',
                                                  task_id=task.id)}

# The task result lives here
@main_bp.route('/test_task_status/<task_id>')
def test_task_status(task_id):
    task = test_task.AsyncResult(task_id)
    response = {
        'state': task.state
        }

    return jsonify(response)
  1. test_芹菜.html:html+JS
{% extends 'base.html' %}

{% block app_content %}
<h1>Test Page</h1>

<div id="test-div">
    </div>
{% endblock app_content %}


{% block scripts %}

{{ super() }}

<script>
    var test_task_url = "{{ test_task_url }}";
    console.log(test_task_url);

    update_test_results(test_task_url);

    function update_test_results(status_url) {
        // send GET request to status URL
        $.getJSON(status_url, function(data) {
            console.log(data);
            console.log('Status Update function triggered')
            console.log(data['state'])
            if (data['state'] == 'SUCCESS'){
                console.log('Task succeeded')
                $("#test-div").append('<p>Task <b>Succeeded</b></p>')
                }

            else if (data['state'] == 'FAILURE') {
                console.log('Task failed')
                $("#test-div").append('<p>Task <b>Failed</b></p>')
            }

            else if (data['state'] == 'PENDING') {
                console.log('Task is pending')
                $("#test-div").append('<p>Task is <b>Pending</b></p>')
            }

            else {
                console.log('Other state found')
                // rerun in 2 seconds
                setTimeout(function() {
                    var status_url = "{{ test_task_url }}";
                    update_test_results(status_url);
                }, 2000);
            }
        });
    };

</script>

{% endblock scripts %}

我认为上面的代码是可以的,但我还是要发布它来增加上下文

app/celery_tasks.py

...
from . import make_celery

@celery.task(bind=True)
def test_task(self):
    time.sleep(5)

    return 1

app/init.py

...
from celery import Celery

def make_celery(app=None):
    app = app or create_app()
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_BACKEND_URL'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    celery.app = app

    return celery

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)

    celery.conf.update(app.config)
...

config.py 请注意,它与应用程序目录位于同一级别,而不是在应用程序目录中

class Config(object):
    ...
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_BACKEND_URL = 'redis://localhost:6379/0'

因此,在这个设置中,当我运行test时,我看到:

[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Hub
[2021-05-15 07:43:00,384: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Pool
[2021-05-15 07:43:00,899: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,900: DEBUG/MainProcess] | Worker: Starting Consumer
[2021-05-15 07:43:00,901: DEBUG/MainProcess] | Consumer: Starting Connection
[2021-05-15 07:43:00,920: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-05-15 07:43:00,920: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,921: DEBUG/MainProcess] | Consumer: Starting Events
[2021-05-15 07:43:00,931: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,931: DEBUG/MainProcess] | Consumer: Starting Heart
[2021-05-15 07:43:00,934: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,934: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-05-15 07:43:00,934: INFO/MainProcess] mingle: searching for neighbors
[2021-05-15 07:43:01,961: INFO/MainProcess] mingle: all alone
[2021-05-15 07:43:01,961: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,961: DEBUG/MainProcess] | Consumer: Starting Tasks
[2021-05-15 07:43:01,965: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,965: DEBUG/MainProcess] | Consumer: Starting Control
[2021-05-15 07:43:01,968: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,968: DEBUG/MainProcess] | Consumer: Starting Gossip
[2021-05-15 07:43:01,971: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Consumer: Starting event loop
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2021-05-15 07:43:01,972: INFO/MainProcess] celery@macbook-pro.lan ready.
[2021-05-15 07:43:01,972: DEBUG/MainProcess] basic.qos: prefetch_count->32

然后,当我加载路由(/test\u芹菜)时,我得到以下错误:

[2021-05-15 07:43:06,717: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/Users/me/soul/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 555, in on_task_received
    strategy = strategies[type_]
KeyError: 'app.celery_tasks.test_task'

我检查了此错误中引用的文档,但仍然无法解决此问题


Tags: debugtestlogappurltaskconsumerstatus
2条回答

app/__init__.py中,您无条件地创建一个芹菜实例,在调用make_celery()时创建第二个芹菜实例。这是你所看到的问题的偶然现象,但确实表明结构混乱

这里,你看到的问题始于

celery -A app worker

以及导入app时会发生什么(或者更重要的是,不会发生什么)

error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'

表示导入app不足以导致app.celery_tasks被导入。如果您在那里导入蓝图,我怀疑它是从create_app()内部间接导入的,但是当芹菜导入appcreate_app()没有被调用,因此芹菜看不到您的任务

考虑应用程序周围的包装器。让我们称之为myapp.py

from app import celery, create_app

app = create_app()

然后将make_celery()的大部分移动到create_app()

启动应用程序时,FLASK_APP=myapp flask run将使用myapp.app,该实例是Flask实例,并且celery -A myapp.celery将在运行create_app()后拾取芹菜实例,该实例导入蓝图,导入app.celery_tasks作为副作用

这种方法的好处是webapp和芹菜工作者实例之间的设置是相同的,这避免了一系列难以诊断的问题

我有一个工作示例here,您可以从中借用

我通过将celery -A app worker更改为celery -A app.celery_tasks worker解决了这个问题。更改后,我可以看到Cellery_tasks.py中的任务已注册

相关问题 更多 >