使用bin/芹菜工人消费香草amqp消息的工具(即,不是芹菜任务)

celery-message-consumer的Python项目详细描述


用法

pip install celery-message-consumer

处理程序

在代码中,可以通过装饰python来定义消息处理程序 功能,与芹菜任务几乎相同:

fromevent_consumerimportmessage_handler@message_handler('my.routing.key')defprocess_message(body):# `body` has been deserialized for us by the Celery workerprint(body)@message_handler(['my.routing.key1','my.routing.key2'])defprocess_messages(body):# you can register handler for multiple routing keys@message_handler('my.routing.*')defprocess_all_messages(body):# or wildcard routing keys, if using a 'topic' exchange

像芹菜任务一样,它定义的模块必须实际得到 在某个点导入,以便注册处理程序。

将创建一个队列(实际上,三个队列-见下文)来接收 与路由密钥匹配的消息。

芹菜

在代码的其他地方,您需要实例化一个芹菜应用程序,然后 应用我们的自定义"consumerstep",它将我们的消息处理程序挂接到 工人。如果您已经在项目中使用芹菜 作为芹菜 那么你可能需要为任务和 消息使用者。

fromceleryimportCeleryfromevent_consumer.handlersimportAMQPRetryConsumerStepmain_app=Celery()consumer_app=Celery()consumer_app.steps['consumer'].add(AMQPRetryConsumerStep)

您可能需要为每个应用程序分别配置。见 芹菜文档

在消息使用者应用程序的配置中,添加包含 您的修饰消息处理函数可以精确地导入芹菜 就像你做芹菜一样-这保证了它们被进口 在工人启动时注册。

然后从命令行运行芹菜工人,就像平常一样 会,连接到消费者应用程序:

bin/celery worker -A myproject.mymodule:consumer_app

配置

设置主要通过python文件进行配置,例如 作为您现有的django settings.py 或celeriy celleryconfig.py 。 要引导它,有几个env变量来控制如何配置 已加载:

  • 事件消费者应用程序配置 应该是python模块的导入路径,例如: 事件消费者应用程序配置=django.conf.settings
  • 事件消费者配置名称空间 设置用于从env和 配置文件。默认为 事件消费者

有关详细信息,请参见 事件消费者/conf/ 的来源。

一些有用的配置键(它们都以前缀 事件消费者默认值:

  • 序列化程序 这是芹菜序列化程序的名称,例如。 'json' 。使用者将只接受在此中序列化的消息 格式:
  • 队列名称前缀 如果使用默认队列名称(路由密钥),则 此前缀将添加到队列名称中。如果你提供一个定制 处理程序装饰器中的队列名称将不应用前缀。
  • 最大重试次数 默认为 4 (即. 1次尝试+4次重试=5 罢工)
  • backoff_func 接受一个函数 (int) ->; float 返回 基于当前重试计数器的重试延迟(秒) 消息。
  • 存档过期时间 "存档"队列,之后Exchange将删除它们。默认值 至24天。
  • 如果消息处理程序使用 Django DB连接,以便工作进程能够处理 可怕的"当前事务已中止" 错误并继续。
  • 如果需要消息处理程序连接 排队到特定的交换机,然后您可以提供如下命令:
EXCHANGES={# a reference name for this config, used when attaching handlers'default':{'name':'data',# actual name of exchange in RabbitMQ'type':'topic',# an AMQP exchange type},'other':{...},...}

默认情况下, 'default' 配置将被使用。你可以附加 装饰时特定交换的处理程序:

@message_handler('my.routing.key',exchange='other')defprocess_message(body):pass

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
验证Java验证用户输入的一系列数字   java如何在SpringBoot中字段验证失败时在ConstraintViolationException中获取RequestParam名称   java如何解决Vertx阻塞DNS问题   java意外类型平均值   java如何将dataframe的UUID列转换为包含相同十六进制序列的简单字符串?   身份散列映射的java用法   java无法在eclipse中导入现有项目   进程从运行的java程序中获取CPU号   java将文本视图的特定行滚动到顶部在最后一个屏幕上不起作用   无法初始化java SpringSecurityFilterChain   java当我在项目中使用volatile时,为什么下面的代码显示不同的结果?   是否有转换java的标准方法。util。函数,消费者<T>转换为java。util。作用函数<T,Void>   java nginx分块传输编码失败   java如何将几个IF转换为一个循环   java URI从路径中删除/删除