豆宝阿里云RocketMQ Python SDK

doubao-aliyun-mq-sdk的Python项目详细描述


DOUBAO-ALIYUN-MQ-SDK

豆包阿里云rocketMQ SDK

安装

pip3 install doubao-aliyun-mq-sdk

或者

python3 setup.py install

使用

fromdoubao_aliyun_mqimportClientasAliyunMQClientaliyun_mq_client=AliyunMQClient(<http_endpoint>,<access_key>,<secret_key>)# 获取消费者实例consumer=aliyun_mq_client.get_consumer(<instance_id>,<topic_name>,<group_id>)# 获取生产者实例producer=aliyun_mq_client.get_producer(<instance_id>,<topic_name>)

生产者(producer)

producer.send

发送消息

  • msg: 消息内容, 类型str
  • tag: 消息标签, 默认 ''
  • properties: 属性, 类型dict, 默认 None
  • start_deliver_time: 定时消息,毫秒级绝对时间,默认 None

producer.send_json

发送json格式的消息

  • msg: 消息内容(json), 类型dict
  • tag: 消息标签, 默认 ''
  • properties: 属性, 类型dict, 默认 None
  • start_deliver_time: 定时消息,毫秒级绝对时间,默认 None

消费者(consumer)

consumer.receive

接收消息

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

consumer.ack_message

确认消息

  • recv_msgs: 接收到的消息列表, 类型 list

consumer.consume

使用with语句实现消费消息(接收并确认)

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

consumer.consume_decorator

消费装饰器,被装饰的函数第一个参数返回接收到的消息泪飙

  • batch: 一次最多接收条数(最多可设置为16条), 默认 1
  • wait_seconds: 长轮询时间(最多可设置为30秒), 默认 3

示例

fromdoubao_configimportClientasConfigClientimporttimefromdoubao_aliyun_mqimportClientasAliyunMQClient# 配合doubao-config使用config_client=ConfigClient(<config_host>,<config_username>,<config_password>)config=config_client.get_config(<application>,<profile>)aliyun_mq_client=AliyunMQClient(config['base.comm.rocket-mq.onsAddr.digital.http'],config['base.rocket-mq.accessKeyId'],config['base.rocket-mq.accessKeySecret'])# 获取消费者实例consumer=aliyun_mq_client.get_consumer(config['base.comm.rocket-mq.digital.id'],<topic_name>,<group_id>)# 获取生产者实例producer=aliyun_mq_client.get_producer(config['base.comm.rocket-mq.digital.id'],<topic_name>)size=10# 发送消息foriinrange(size):msg='test %d'%iproducer.send(msg)print('send:',msg)# 接收消息msgs=consumer.receive(batch=size)ifmsgs:formsginmsgs:print('receive:',msg.message_id,msg.message_body,msg.message_tag)# 确认消息消费成功consumer.ack_message(msgs)print('ack message:',msgs)# 发送消息(json)foriinrange(size):msg={'test':1}producer.send_json(msg)print('send:',{'test':1})# 消费消息(接收消息并确认)foriinrange(size):withconsumer.consume()asmsgs:ifmsgs:formsginmsgs:print('consume:',msg.message_id,msg.message_body,msg.message_tag)# 发送消息(json)带标签foriinrange(size):msg={'test':1}producer.send_json(msg,tag='ttt')print('send:',{'test':1})# 消费消息(装饰器)@consumer.consume_decorator(batch=10)deftest_func(msgs,*args,**kwargs):print('decorator consume:',msgs)formsginmsgs:print('--- message_id',msg.message_id)print('--- message_tag',msg.message_tag)print('--- message_body',msg.message_body)print('--- publish_time',msg.publish_time)print('--- consumed_times',msg.consumed_times)print('--- next_consume_time',msg.next_consume_time)foriinrange(size):test_func()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java从Dropwizard中的Minio检索文件时,GET请求中的超时是如何处理的?   带Hibernate的java Jackson用于序列化以避免枚举   Raspberry Pi上的java Jave分段错误   java在屏幕旋转时不保存当前片段和数据   java War文件未在Heroku上正确部署   如何使用Java处理Selenium webdriver中的促销广告或cookie   java处理“用法:PApplet[options]<classname>[sketch args]”   java文本文件错误扫描程序   运行第一个JavaFX模块化程序时出现java异常   java将fileoutputstream转换为字符串   如何调试gstreamerjava?   java Spring RestTemplate ResponseBody类是什么样的   如何将JSON数组转换为Java列表。我在用斯文森   javascript在显示div按钮后进入新页面