面向对象的amqp层,用于微服务通信。

amqp-ko的Python项目详细描述


amqp k_

面向对象的amqp层,用于微服务通信。

用法

建议使用amqp k_的方法是创建自己的队列对象。最简单的方法是使用createQueue函数。

创建队列

fromamqp_koimportcreate_queue,AsyncConnection,Message,MessageGatefromdataclassesimportdataclass@dataclass(frozen=True)classTopicFollow(Message):user_id:inttopic_name:strdefunmarshal_topic_follow(data:dict)->TopicFollow:returnTopicFollow(user_id=data["user_id"],topic_name=data["topic_name"],)message_gates=[MessageGate("topic_follow",TopicFollow,unmarshal_topic_follow),]asyncwithAsyncConnection("localhost",5672,"rabbitmq","rabbitmq")asconnection:queue=awaitcreate_queue(connection,"exchange-name",message_gates)

使用消息

fromamqp_koimportConsumer,JobclassConnectUserWithTopic(Consumer):asyncdefconsume(self,job:Job):# Put here some code to connect user with a topic# using "job.message.userId" and "job.message.topicName"awaitjob.ack()awaitqueue.consume("queue-name",{TopicFollow:ConnectUserWithTopic()},)

生成消息

message=TopicFollow(120,"entertainment")awaitqueue.produce(message)

安装

pip install amqp-ko

作者:Michał Budziak

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java为什么我不能从文件中读取字符串?   如何在java中以相同的方式洗牌两个列表?   java如何使用WebFilter实现授权头检查   如何使用Java正确显示谷歌云存储中的日文字符?   使用Smook和Freemarker将Java对象转换为XML的模型是什么?   使用hibernate工具(Jboss)社区生成hibernate映射文件时发生java错误   java Android在运行任务时更新UI?   单元测试Powermockito/Java间谍类以验证私有方法调用   java libgdx在另一个上渲染srite会导致背景精灵消失   web服务如何在Java中创建这样的SOAP请求?   java文本视图把阿拉伯语字母弄乱了   java中的socket不活动超时   java Android Studio“未找到Android API平台30的源代码”   java Spring引导返回嵌套数组,但是。。。我需要数据,但只需要一次   java订阅doOnNext方法中的另一个可观察对象