纯python amqp异步客户端库
aiormq的Python项目详细描述
aiormq是一个纯python amqp客户端库。
目录
Status
生产/稳定
Features
- 通过URL连接
- amqp example: amqp://user:password@server.host/vhost
- secure amqp example: amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0
接收帧的缓冲队列
只有PLAIN身份验证机制支持
基于通道的异步锁
注意
AMQP 0.9.1要求对某些帧类型进行序列化发送 在频道上。例如,内容正文必须在 内容标题。但是帧可以异步发送。 在另一个频道。
跟踪无法发布的邮件 (使用connection.channel(on_return_raises=false)禁用)
- 具有url查询参数的完全ssl/tls支持:
- cafile=-字符串包含CA证书文件的路径
- capath=-字符串包含CA证书的路径
- cadata=-base64编码的ca证书数据
- keyfile=-字符串包含密钥文件的路径
- certfile=-字符串包含证书文件的路径
- no_verify_ssl-布尔值禁用证书验证
Pythontype hints
使用pamqp作为amqp 0.9.1帧编码器/解码器
Tutorial
Introduction
Simple consumer
importasyncioimportaiormqasyncdefon_message(message):""" on_message doesn't necessarily have to be defined as async. Here it is to show that it's possible. """print(" [x] Received message %r"%message)print("Message body is: %r"%message.body)print("Before sleep!")awaitasyncio.sleep(5)# Represents async I/O operationsprint("After sleep!")asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()# Declaring queuedeaclare_ok=awaitchannel.queue_declare('helo')consume_ok=awaitchannel.basic_consume(deaclare_ok.queue,on_message,no_ack=True)loop=asyncio.get_event_loop()loop.run_until_complete(main())loop.run_forever()
Simple publisher
importasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost//")# Creating a channelchannel=awaitconnection.channel()# Sending the messageawaitchannel.basic_publish(b'Hello World!',routing_key='hello')print(" [x] Sent 'Hello World!'")loop=asyncio.get_event_loop()loop.run_until_complete(main())
Work Queues
Create new task
importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()body=b' '.join(sys.argv[1:])orb"Hello World!"# Sending the messageawaitchannel.basic_publish(body,routing_key='task_queue',properties=aiormq.spec.Basic.Properties(delivery_mode=1,))print(" [x] Sent %r"%body)awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())
Simple worker
importasyncioimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print(" [x] Received message %r"%(message,))print(" Message body is: %r"%(message.body,))asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)# Declaring queuedeclare_ok=awaitchannel.queue_declare('task_queue',durable=True)# Start listening the queue with name 'task_queue'awaitchannel.basic_consume(declare_ok.queue,on_message,no_ack=True)loop=asyncio.get_event_loop()loop.run_until_complete(main())# we enter a never-ending loop that waits for data and runs# callbacks whenever necessary.print(" [*] Waiting for messages. To exit press CTRL+C")loop.run_forever()
Publish Subscribe
Publisher
importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.exchange_declare(exchange='logs',exchange_type='fanout')body=b' '.join(sys.argv[1:])orb"Hello World!"# Sending the messageawaitchannel.basic_publish(body,routing_key='info',exchange='logs')print(" [x] Sent %r"%(body,))awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())
Subscriber
importasyncioimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print("[x] %r"%(message.body,))awaitmessage.channel.basic_ack(message.delivery.delivery_tag)asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)awaitchannel.exchange_declare(exchange='logs',exchange_type='fanout')# Declaring queuedeclare_ok=awaitchannel.queue_declare(exclusive=True)# Binding the queue to the exchangeawaitchannel.queue_bind(declare_ok.queue,'logs')# Start listening the queue with name 'task_queue'awaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.create_task(main())# we enter a never-ending loop that waits for data# and runs callbacks whenever necessary.print(' [*] Waiting for logs. To exit press CTRL+C')loop.run_forever()
Routing
Direct consumer
importsysimportasyncioimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print(" [x] %r:%r"%(message.delivery.routing_key,message.body))awaitmessage.channel.basic_ack(message.delivery.delivery_tag)asyncdefmain():# Perform connectionconnection=aiormq.Connection("amqp://guest:guest@localhost/")awaitconnection.connect()# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)severities=sys.argv[1:]ifnotseverities:sys.stderr.write("Usage: %s [info] [warning] [error]\n"%sys.argv[0])sys.exit(1)# Declare an exchangeawaitchannel.exchange_declare(exchange='logs',exchange_type='direct')# Declaring random queuedeclare_ok=awaitchannel.queue_declare(durable=True,auto_delete=True)forseverityinseverities:awaitchannel.queue_bind(declare_ok.queue,'logs',routing_key=severity)# Start listening the random queueawaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.run_until_complete(main())# we enter a never-ending loop that waits for data# and runs callbacks whenever necessary.print(" [*] Waiting for messages. To exit press CTRL+C")loop.run_forever()
Emitter
importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.exchange_declare(exchange='logs',exchange_type='direct')body=(b' '.join(arg.encode()forarginsys.argv[2:])orb"Hello World!")# Sending the messagerouting_key=sys.argv[1]iflen(sys.argv)>2else'info'awaitchannel.basic_publish(body,exchange='logs',routing_key=routing_key,properties=aiormq.spec.Basic.Properties(delivery_mode=1))print(" [x] Sent %r"%body)awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())
Topics
Publisher
importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.exchange_declare('topic_logs',exchange_type='topic')routing_key=(sys.argv[1]iflen(sys.argv)>2else'anonymous.info')body=(b' '.join(arg.encode()forarginsys.argv[2:])orb"Hello World!")# Sending the messageawaitchannel.basic_publish(body,exchange='topic_logs',routing_key=routing_key,properties=aiormq.spec.Basic.Properties(delivery_mode=1))print(" [x] Sent %r"%(body,))awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())
Consumer
importasyncioimportsysimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print(" [x] %r:%r"%(message.delivery.routing_key,message.body))awaitmessage.channel.basic_ack(message.delivery.delivery_tag)asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/",loop=loop)# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)# Declare an exchangeawaitchannel.exchange_declare('topic_logs',exchange_type='topic')# Declaring queuedeclare_ok=awaitchannel.queue_declare('task_queue',durable=True)binding_keys=sys.argv[1:]ifnotbinding_keys:sys.stderr.write("Usage: %s [binding_key]...\n"%sys.argv[0])sys.exit(1)forbinding_keyinbinding_keys:awaitchannel.queue_bind(declare_ok.queue,'topic_logs',routing_key=binding_key)# Start listening the queue with name 'task_queue'awaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.create_task(main())# we enter a never-ending loop that waits for# data and runs callbacks whenever necessary.print(" [*] Waiting for messages. To exit press CTRL+C")loop.run_forever()
Remote procedure call (RPC)
RPC server
importasyncioimportaiormqimportaiormq.typesdeffib(n):ifn==0:return0elifn==1:return1else:returnfib(n-1)+fib(n-2)asyncdefon_message(message:aiormq.types.DeliveredMessage):n=int(message.body.decode())print(" [.] fib(%d)"%n)response=str(fib(n)).encode()awaitmessage.channel.basic_publish(response,routing_key=message.reply_to,properties=aiormq.spec.Basic.Properties(correlation_id=message.correlation_id),)awaitmessage.channel.basic_ack(message.delivery.delivery_tag)print('Request complete')asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()# Declaring queuedeclare_ok=awaitchannel.queue_declare('rpc_queue')# Start listening the queue with name 'hello'awaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.create_task(main(loop))# we enter a never-ending loop that waits for data# and runs callbacks whenever necessary.print(" [x] Awaiting RPC requests")loop.run_forever()
RPC client
importasyncioimportuuidimportaiormqimportaiormq.typesclassFibonacciRpcClient:def__init__(self):self.connection=None# type: aiormq.Connectionself.channel=None# type: aiormq.Channelself.callback_queue=''self.futures={}self.loop=loopasyncdefconnect(self):self.connection=awaitaiormq.connect("amqp://guest:guest@localhost/")self.channel=awaitself.connection.channel()declare_ok=awaitself.channel.queue_declare(exclusive=True,auto_delete=True)awaitself.channel.basic_consume(declare_ok.queue,self.on_response)self.callback_queue=declare_ok.queuereturnselfasyncdefon_response(self,message:aiormq.types.DeliveredMessage):future=self.futures.pop(message.correlation_id)future.set_result(message.body)asyncdefcall(self,n):correlation_id=str(uuid.uuid4())future=loop.create_future()self.futures[correlation_id]=futureawaitself.channel.basic_publish(str(n).encode(),routing_key='rpc_queue',properties=aiormq.spec.Basic.Properties(content_type='text/plain',correlation_id=correlation_id,reply_to=self.callback_queue,))returnint(awaitfuture)asyncdefmain():fibonacci_rpc=awaitFibonacciRpcClient().connect()print(" [x] Requesting fib(30)")response=awaitfibonacci_rpc.call(30)print(" [.] Got %r"%response)loop=asyncio.get_event_loop()loop.run_until_complete(main())