Celery未将消息发布到基于非任务的RabbitMQ队列

2024-10-02 08:19:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用芹菜将Django Rest框架后端连接到RabbitMQ。我有一些微服务使用RabbitMQ作为后端和这些服务之间的消息总线。当后端调用一个将消息推送到micro services消息总线的任务时,消息永远不会被放入队列中,以便其中一个服务获取。我在my tasks.py中声明的队列是在RabbitMQ中创建的,但没有被任何生产者使用

test芹菜/芹菜.py

...
from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "testcelery.settings")
app = Celery("testcelery")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

test芹菜/settings.py

...

installed_apps = [
...
"backend_processing"
]

CELERY_BROKER_URL = "amqp://testuser@rabbitmq_host:5672"
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"

后端处理/任务.py

...
from celery import shared_task
from testcelery.celery import app as celery_app
from kombu import Exchange, Queue

test_queue = Queue("test_queue", Exchange("test"), "test", durable=True)

@shared_task
def run_processing(data, producer=None):
    with celery_app.producer_or_acquire(producer) as producer:
        producer.publish(data,
                         declare=[test_queue],
                         retry=True,
                         exchange=test_queue.exchange,
                         routing_key=test_queue.routing_key,
                         delivery_mode="persistent")

我需要在配置中修改什么,以允许芹菜发布到settings.py提供给芹菜的队列之外的队列


Tags: producerfrompytestimportapp消息settings

热门问题