使用RabbitMQ等代理构建管道的消息交换引擎

barterdude的Python项目详细描述


调酒师

Build StatusCoverage Status

消息交换引擎使用RabbitMQ这样的代理来构建管道。这个项目是建立在伟大的async-worker之上的。在

barter

安装

使用Python3.6+

pip install barterdude

使用

使用以下完整示例构建您的消费者:

^{pr2}$

建立自己的钩子

底座挂钩(简单型)

这些钩子在消息检索、成功和失败时调用。在

frombarterdude.hooksimportBaseHookfromasyncworker.rabbitmq.messageimportRabbitMQMessageclassMyCounterHook(BaseHook):_consume=_fail=_success=0asyncdefon_success(self,message:RabbitMQMessage):self._success+=1asyncdefon_fail(self,message:RabbitMQMessage,error:Exception):self._fail+=1asyncdefbefore_consume(self,message:RabbitMQMessage):self._consume+=1

Http Hook(开放路由)

这些钩子可以完成简单钩子所做的所有事情,但对路由做出响应。在

fromaiohttpimportwebfrombarterdude.hooksimportHttpHookfromasyncworker.rabbitmq.messageimportRabbitMQMessageclassMyCounterHttpHook(HttpHook):_consume=_fail=_success=0asyncdef__call__(self,req:web.Request):returnweb.json_response(dict(consumed=self._consume,success=self._success,fail=self._fail))asyncdefon_success(self,message:RabbitMQMessage):self._success+=1asyncdefon_fail(self,message:RabbitMQMessage,error:Exception):self._fail+=1asyncdefbefore_consume(self,message:RabbitMQMessage):self._consume+=1

数据共享

遵循async-workeraiohttp中的方法, BarterDude不鼓励使用全局变量,也就是单例变量。在

要在应用程序中全局共享数据状态,BarterDude的行为类似于dict。 例如,可以在BarterDude实例中保存类似全局的变量:

frombarterdudeimportBarterDudebarterdude=BarterDude()baterdude["my_variable"]=data

把它带回消费者那里

asyncdefconsumer_access_storage(msg):data=baterdude["my_variable"]

模式验证

使用的消息可以通过json架构进行验证:

@barterdude.consume_amqp(["queue1","queue2"],monitor,validation_schema={"$schema":"http://json-schema.org/draft-04/schema#","$id":"http://example.com/example.json","type":"object","title":"Message Schema","description":"The root schema comprises the entire JSON document.","additionalProperties":True,"required":["key"],"properties":{"key":{"$id":"#/properties/key","type":"string","title":"The Key Schema","description":"An explanation about message.","default":""}}},requeue_on_validation_fail=False# invalid messages are removed without requeue)

您仍然可以在生成消息之前或在需要时验证消息:

frombarterdude.messageimportMessageValidationvalidator=MessageValidation(json_schema)validator.validate(message)

数据保护

Barterdude考虑了GDPR数据保护,默认情况下不记录消息体,但是您可以使用环境变量BARTERDUDE_LOG_REDACTED=0停用它

现在消息体将被日志钩子记录。在

此配置只控制BarterDude的默认日志挂钩,对用户自定义用户日志没有影响。如果要使用此配置控制日志,请使用:

frombaterdude.confimportBARTERDUDE_LOG_REDACTED

测试

为了测试异步使用者,我们建议使用asynctestlib

fromasynctestimportTestCaseclassTestMain(TestCase):deftest_should_pass(self):self.assertTrue(True)

我们希望你喜欢!:传情动漫:

贡献

对于开发和贡献,请遵循Contributing GuideALWAYS尊重Code of Conduct

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Kafka producer大量内存使用(泄漏?)   java NullPointerException。。。正在插入数据但无法检索数据[Mysql DB]   java spring+jpa+hibernate=没有可用于当前线程的实际事务的EntityManager无法可靠地处理“persist”调用   getelementbyid在没有ID的情况下如何在java中使用GetElementsById   java有没有一种使用WatchService强制轮询的方法?   java将值从jframe传递给另一个jframe并使用它   Java/Groovy中带重试的反应式事件处理   具有两个包装器元素的java Jackson XML ArrayList输出   java总是在范围内使用不同的随机元素   取消选择java下拉列表值   多线程如何在Java中为对象的不同成员拥有不同的同步块   java如何使用多线程从文本文件中读取输入   java Spring启动附加崩溃命令   java使用公共或单独的actionPerfomed方法有什么区别   java用Spring3.0中的SpEL替换JSP中的EL   java作为windows服务运行应用程序时无法访问共享文件夹   java xml 1.1规范中的“解析数据”是什么意思?   以编程方式设置JComboBox索引时java触发ItemListener   java Android WebView:只加载HTML,不加载JS或CSS(在某些设备中)   Java:计算do/while循环的数量