如何在芹菜中列出排队的物品?

2024-05-06 17:48:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Ubuntu EC2节点上有一个Django项目,我一直在使用它来设置一个使用Celery的异步。

我正在跟踪http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/和文档。

我已经能够在命令行执行基本任务,使用:

(env1)ubuntu@ip-172-31-22-65:~/projects/tp$ celery --app=myproject.celery:app worker --loglevel=INFO

我刚刚意识到,我的队列中有很多任务没有执行:

[2015-03-28 16:49:05,916: WARNING/MainProcess] Restoring 4 unacknowledged message(s).
(env1)ubuntu@ip-172-31-22-65:~/projects/tp$ celery -A tp purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation!

(to skip this prompt use the -f option)

Are you sure you want to delete all tasks (yes/NO)? yes
Purged 81 messages from 1 known task queue.

如何从命令行获取排队项的列表?


Tags: 命令行fromipappfortaskqueueubuntu
2条回答

如果你想得到所有预定的任务

celery inspect scheduled

查找所有活动队列

celery inspect active_queues

为了地位

celery inspect stats

对于所有命令

celery inspect

如果您想明确地得到它,因为您使用redis作为队列。那么

redis-cli

>KEYS * #find all keys

然后找出与celery有关的东西

>LLEN KEY # i think it gives length of list

以下是Redis的复制粘贴解决方案:

def get_celery_queue_len(queue_name):
    from yourproject.celery import app as celery_app
    with celery_app.pool.acquire(block=True) as conn:
        return conn.default_channel.client.llen(queue_name)


def get_celery_queue_items(queue_name):
    import base64
    import json
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)

    decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

它和Django一起工作。别忘了改变yourproject.celery

相关问题 更多 >