使用bin/芹菜工人消费香草amqp消息的工具(即,不是芹菜任务)
celery-message-consumer的Python项目详细描述
用法
pip install celery-message-consumer
处理程序
在代码中,可以通过装饰python来定义消息处理程序 功能,与芹菜任务几乎相同:
fromevent_consumerimportmessage_handler@message_handler('my.routing.key')defprocess_message(body):# `body` has been deserialized for us by the Celery workerprint(body)@message_handler(['my.routing.key1','my.routing.key2'])defprocess_messages(body):# you can register handler for multiple routing keys@message_handler('my.routing.*')defprocess_all_messages(body):# or wildcard routing keys, if using a 'topic' exchange
像芹菜任务一样,它定义的模块必须实际得到 在某个点导入,以便注册处理程序。
将创建一个队列(实际上,三个队列-见下文)来接收 与路由密钥匹配的消息。
芹菜
在代码的其他地方,您需要实例化一个芹菜应用程序,然后 应用我们的自定义"consumerstep",它将我们的消息处理程序挂接到 工人。如果您已经在项目中使用芹菜 作为芹菜 那么你可能需要为任务和 消息使用者。
fromceleryimportCeleryfromevent_consumer.handlersimportAMQPRetryConsumerStepmain_app=Celery()consumer_app=Celery()consumer_app.steps['consumer'].add(AMQPRetryConsumerStep)
您可能需要为每个应用程序分别配置。见 芹菜文档
在消息使用者应用程序的配置中,添加包含 您的修饰消息处理函数可以精确地导入芹菜 就像你做芹菜一样-这保证了它们被进口 在工人启动时注册。
然后从命令行运行芹菜工人,就像平常一样 会,连接到消费者应用程序:
bin/celery worker -A myproject.mymodule:consumer_app
配置
设置主要通过python文件进行配置,例如 作为您现有的django settings.py 或celeriy celleryconfig.py 。 要引导它,有几个env变量来控制如何配置 已加载:
- 事件消费者应用程序配置 应该是python模块的导入路径,例如: 事件消费者应用程序配置=django.conf.settings
- 事件消费者配置名称空间 设置用于从env和 配置文件。默认为 事件消费者
有关详细信息,请参见 事件消费者/conf/ 的来源。
一些有用的配置键(它们都以前缀 事件消费者默认值:
- 序列化程序 这是芹菜序列化程序的名称,例如。 'json' 。使用者将只接受在此中序列化的消息 格式:
- 队列名称前缀 如果使用默认队列名称(路由密钥),则 此前缀将添加到队列名称中。如果你提供一个定制 处理程序装饰器中的队列名称将不应用前缀。
- 最大重试次数 默认为 4 (即. 1次尝试+4次重试=5 罢工)
- backoff_func 接受一个函数 (int) ->; float 返回 基于当前重试计数器的重试延迟(秒) 消息。
- 存档过期时间 "存档"队列,之后Exchange将删除它们。默认值 至24天。
- 如果消息处理程序使用 Django DB连接,以便工作进程能够处理 可怕的"当前事务已中止" 错误并继续。
- 如果需要消息处理程序连接 排队到特定的交换机,然后您可以提供如下命令:
EXCHANGES={# a reference name for this config, used when attaching handlers'default':{'name':'data',# actual name of exchange in RabbitMQ'type':'topic',# an AMQP exchange type},'other':{...},...}
默认情况下, 'default' 配置将被使用。你可以附加 装饰时特定交换的处理程序:
@message_handler('my.routing.key',exchange='other')defprocess_message(body):pass