豆宝阿里云RocketMQ Python SDK
doubao-aliyun-mq-sdk的Python项目详细描述
DOUBAO-ALIYUN-MQ-SDK
豆包阿里云rocketMQ SDK
安装
pip3 install doubao-aliyun-mq-sdk
或者
python3 setup.py install
使用
fromdoubao_aliyun_mqimportClientasAliyunMQClientaliyun_mq_client=AliyunMQClient(<http_endpoint>,<access_key>,<secret_key>)# 获取消费者实例consumer=aliyun_mq_client.get_consumer(<instance_id>,<topic_name>,<group_id>)# 获取生产者实例producer=aliyun_mq_client.get_producer(<instance_id>,<topic_name>)
生产者(producer)
producer.send
发送消息
msg
: 消息内容, 类型strtag
: 消息标签, 默认 ''properties
: 属性, 类型dict, 默认 Nonestart_deliver_time
: 定时消息,毫秒级绝对时间,默认 None
producer.send_json
发送json格式的消息
msg
: 消息内容(json), 类型dicttag
: 消息标签, 默认 ''properties
: 属性, 类型dict, 默认 Nonestart_deliver_time
: 定时消息,毫秒级绝对时间,默认 None
消费者(consumer)
consumer.receive
接收消息
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
consumer.ack_message
确认消息
recv_msgs
: 接收到的消息列表, 类型 list
consumer.consume
使用with语句实现消费消息(接收并确认)
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
consumer.consume_decorator
消费装饰器,被装饰的函数第一个参数返回接收到的消息泪飙
batch
: 一次最多接收条数(最多可设置为16条), 默认 1wait_seconds
: 长轮询时间(最多可设置为30秒), 默认 3
示例
fromdoubao_configimportClientasConfigClientimporttimefromdoubao_aliyun_mqimportClientasAliyunMQClient# 配合doubao-config使用config_client=ConfigClient(<config_host>,<config_username>,<config_password>)config=config_client.get_config(<application>,<profile>)aliyun_mq_client=AliyunMQClient(config['base.comm.rocket-mq.onsAddr.digital.http'],config['base.rocket-mq.accessKeyId'],config['base.rocket-mq.accessKeySecret'])# 获取消费者实例consumer=aliyun_mq_client.get_consumer(config['base.comm.rocket-mq.digital.id'],<topic_name>,<group_id>)# 获取生产者实例producer=aliyun_mq_client.get_producer(config['base.comm.rocket-mq.digital.id'],<topic_name>)size=10# 发送消息foriinrange(size):msg='test %d'%iproducer.send(msg)print('send:',msg)# 接收消息msgs=consumer.receive(batch=size)ifmsgs:formsginmsgs:print('receive:',msg.message_id,msg.message_body,msg.message_tag)# 确认消息消费成功consumer.ack_message(msgs)print('ack message:',msgs)# 发送消息(json)foriinrange(size):msg={'test':1}producer.send_json(msg)print('send:',{'test':1})# 消费消息(接收消息并确认)foriinrange(size):withconsumer.consume()asmsgs:ifmsgs:formsginmsgs:print('consume:',msg.message_id,msg.message_body,msg.message_tag)# 发送消息(json)带标签foriinrange(size):msg={'test':1}producer.send_json(msg,tag='ttt')print('send:',{'test':1})# 消费消息(装饰器)@consumer.consume_decorator(batch=10)deftest_func(msgs,*args,**kwargs):print('decorator consume:',msgs)formsginmsgs:print('--- message_id',msg.message_id)print('--- message_tag',msg.message_tag)print('--- message_body',msg.message_body)print('--- publish_time',msg.publish_time)print('--- consumed_times',msg.consumed_times)print('--- next_consume_time',msg.next_consume_time)foriinrange(size):test_func()
- 项目
标签: