python或rocketmq api
py-rmq的Python项目详细描述
ali rocketMQ for python
说明
官方so路径要加到LD_LIBRARY_PATH中
调用示例
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author : HaiFeng# @Email : 24918700@qq.com# @Time : 2019/1/16# @desc : demo ali rocketmqimportsysimporttimefrompy_rmq.rmq_consumerimportConsumerfrompy_rmq.rmq_producerimportProducerfrompy_rmq.rmq_consumerimportOrderConsumerfrompy_rmq.rmq_producerimportOrderProducerdefOnConsumer(topic:str,tag:str,key:str,id:str,deliver_time:int,body:str,reconsume_times:int,store_time:int,offset:int):print(body)if__name__=='__main__':t:str=''id:str=''access:str=''secret:str=''topic:str=''iflen(sys.argv)>1:t=sys.argv[1].lower()iflen(sys.argv)>2:id=sys.argv[2]iflen(sys.argv)>3:access=sys.argv[3]iflen(sys.argv)>4:secret=sys.argv[4]iflen(sys.argv)>5:topic=sys.argv[5]ift=='':t=input('select consumer or producer:').lower()ifid=='':id=input('input cid or pid:')ifaccess=='':access=input('input acces key:')ifsecret=='':secret=input('input secret key:')iftopic=='':topic=input('input topic:')ift=='consumer':print('test consumer...')c=Consumer(id,access,secret)c.OnConsumer=OnConsumerc.subscribe(topic,'*')input('press enter to continue test order consumer')c.close()print('test order consumer...')c=OrderConsumer(id,access,secret)c.OnConsumer=OnConsumerc.subscribe(topic,'*')input('press enter exit')c.close()else:tag='tag test'key=time.strftime('%Y%m%d%H%M%S',time.localtime())print('test producer...')p=Producer(id,access,secret)msg=input('input content: ')rst=p.send_msg(topic,tag,key,msg)print(f'result: {rst}')print('test delive msg, delive 30s')rst=p.send_msg(topic,tag,key,msg+'delive',int(time.time()*1000)+30000)print(f'delive msg result: {rst}')p.close()input('press enter continue test order producer')p=OrderProducer(id,access,secret)sharding_key=input('input sharding_key: ')rst=p.send_msg(topic,tag,key,msg,sharding_key)print(f'order msg result: {rst}')p.close()