纯python amqp异步客户端库

aiormq的Python项目详细描述


CoverallsStatusDrone CILatest Versionhttps://img.shields.io/pypi/wheel/aiormq.svghttps://img.shields.io/pypi/pyversions/aiormq.svghttps://img.shields.io/pypi/l/aiormq.svg

aiormq是一个纯python amqp客户端库。

Status

生产/稳定

Features

  • 通过URL连接
  • amqp example: amqp://user:password@server.host/vhost
  • secure amqp example: amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0
  • 接收帧的缓冲队列

  • 只有PLAIN身份验证机制支持

  • Publisher confirms支持

  • Transactions支持

  • 基于通道的异步锁

    注意

    AMQP 0.9.1要求对某些帧类型进行序列化发送 在频道上。例如,内容正文必须在 内容标题。但是帧可以异步发送。 在另一个频道。

  • 跟踪无法发布的邮件 (使用connection.channel(on_return_raises=false)禁用)

  • 具有url查询参数的完全ssl/tls支持:
    • cafile=-字符串包含CA证书文件的路径
    • capath=-字符串包含CA证书的路径
    • cadata=-base64编码的ca证书数据
    • keyfile=-字符串包含密钥文件的路径
    • certfile=-字符串包含证书文件的路径
    • no_verify_ssl-布尔值禁用证书验证
  • Pythontype hints

  • 使用pamqp作为amqp 0.9.1帧编码器/解码器

Tutorial

Introduction

Simple consumer

importasyncioimportaiormqasyncdefon_message(message):"""
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """print(" [x] Received message %r"%message)print("Message body is: %r"%message.body)print("Before sleep!")awaitasyncio.sleep(5)# Represents async I/O operationsprint("After sleep!")asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()# Declaring queuedeaclare_ok=awaitchannel.queue_declare('helo')consume_ok=awaitchannel.basic_consume(deaclare_ok.queue,on_message,no_ack=True)loop=asyncio.get_event_loop()loop.run_until_complete(main())loop.run_forever()

Simple publisher

importasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost//")# Creating a channelchannel=awaitconnection.channel()# Sending the messageawaitchannel.basic_publish(b'Hello World!',routing_key='hello')print(" [x] Sent 'Hello World!'")loop=asyncio.get_event_loop()loop.run_until_complete(main())

Work Queues

Create new task

importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()body=b' '.join(sys.argv[1:])orb"Hello World!"# Sending the messageawaitchannel.basic_publish(body,routing_key='task_queue',properties=aiormq.spec.Basic.Properties(delivery_mode=1,))print(" [x] Sent %r"%body)awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())

Simple worker

importasyncioimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print(" [x] Received message %r"%(message,))print("     Message body is: %r"%(message.body,))asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)# Declaring queuedeclare_ok=awaitchannel.queue_declare('task_queue',durable=True)# Start listening the queue with name 'task_queue'awaitchannel.basic_consume(declare_ok.queue,on_message,no_ack=True)loop=asyncio.get_event_loop()loop.run_until_complete(main())# we enter a never-ending loop that waits for data and runs# callbacks whenever necessary.print(" [*] Waiting for messages. To exit press CTRL+C")loop.run_forever()

Publish Subscribe

Publisher

importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.exchange_declare(exchange='logs',exchange_type='fanout')body=b' '.join(sys.argv[1:])orb"Hello World!"# Sending the messageawaitchannel.basic_publish(body,routing_key='info',exchange='logs')print(" [x] Sent %r"%(body,))awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())

Subscriber

importasyncioimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print("[x] %r"%(message.body,))awaitmessage.channel.basic_ack(message.delivery.delivery_tag)asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)awaitchannel.exchange_declare(exchange='logs',exchange_type='fanout')# Declaring queuedeclare_ok=awaitchannel.queue_declare(exclusive=True)# Binding the queue to the exchangeawaitchannel.queue_bind(declare_ok.queue,'logs')# Start listening the queue with name 'task_queue'awaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.create_task(main())# we enter a never-ending loop that waits for data# and runs callbacks whenever necessary.print(' [*] Waiting for logs. To exit press CTRL+C')loop.run_forever()

Routing

Direct consumer

importsysimportasyncioimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print(" [x] %r:%r"%(message.delivery.routing_key,message.body))awaitmessage.channel.basic_ack(message.delivery.delivery_tag)asyncdefmain():# Perform connectionconnection=aiormq.Connection("amqp://guest:guest@localhost/")awaitconnection.connect()# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)severities=sys.argv[1:]ifnotseverities:sys.stderr.write("Usage: %s [info] [warning] [error]\n"%sys.argv[0])sys.exit(1)# Declare an exchangeawaitchannel.exchange_declare(exchange='logs',exchange_type='direct')# Declaring random queuedeclare_ok=awaitchannel.queue_declare(durable=True,auto_delete=True)forseverityinseverities:awaitchannel.queue_bind(declare_ok.queue,'logs',routing_key=severity)# Start listening the random queueawaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.run_until_complete(main())# we enter a never-ending loop that waits for data# and runs callbacks whenever necessary.print(" [*] Waiting for messages. To exit press CTRL+C")loop.run_forever()

Emitter

importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.exchange_declare(exchange='logs',exchange_type='direct')body=(b' '.join(arg.encode()forarginsys.argv[2:])orb"Hello World!")# Sending the messagerouting_key=sys.argv[1]iflen(sys.argv)>2else'info'awaitchannel.basic_publish(body,exchange='logs',routing_key=routing_key,properties=aiormq.spec.Basic.Properties(delivery_mode=1))print(" [x] Sent %r"%body)awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())

Topics

Publisher

importsysimportasyncioimportaiormqasyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()awaitchannel.exchange_declare('topic_logs',exchange_type='topic')routing_key=(sys.argv[1]iflen(sys.argv)>2else'anonymous.info')body=(b' '.join(arg.encode()forarginsys.argv[2:])orb"Hello World!")# Sending the messageawaitchannel.basic_publish(body,exchange='topic_logs',routing_key=routing_key,properties=aiormq.spec.Basic.Properties(delivery_mode=1))print(" [x] Sent %r"%(body,))awaitconnection.close()loop=asyncio.get_event_loop()loop.run_until_complete(main())

Consumer

importasyncioimportsysimportaiormqimportaiormq.typesasyncdefon_message(message:aiormq.types.DeliveredMessage):print(" [x] %r:%r"%(message.delivery.routing_key,message.body))awaitmessage.channel.basic_ack(message.delivery.delivery_tag)asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/",loop=loop)# Creating a channelchannel=awaitconnection.channel()awaitchannel.basic_qos(prefetch_count=1)# Declare an exchangeawaitchannel.exchange_declare('topic_logs',exchange_type='topic')# Declaring queuedeclare_ok=awaitchannel.queue_declare('task_queue',durable=True)binding_keys=sys.argv[1:]ifnotbinding_keys:sys.stderr.write("Usage: %s [binding_key]...\n"%sys.argv[0])sys.exit(1)forbinding_keyinbinding_keys:awaitchannel.queue_bind(declare_ok.queue,'topic_logs',routing_key=binding_key)# Start listening the queue with name 'task_queue'awaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.create_task(main())# we enter a never-ending loop that waits for# data and runs callbacks whenever necessary.print(" [*] Waiting for messages. To exit press CTRL+C")loop.run_forever()

Remote procedure call (RPC)

RPC server

importasyncioimportaiormqimportaiormq.typesdeffib(n):ifn==0:return0elifn==1:return1else:returnfib(n-1)+fib(n-2)asyncdefon_message(message:aiormq.types.DeliveredMessage):n=int(message.body.decode())print(" [.] fib(%d)"%n)response=str(fib(n)).encode()awaitmessage.channel.basic_publish(response,routing_key=message.reply_to,properties=aiormq.spec.Basic.Properties(correlation_id=message.correlation_id),)awaitmessage.channel.basic_ack(message.delivery.delivery_tag)print('Request complete')asyncdefmain():# Perform connectionconnection=awaitaiormq.connect("amqp://guest:guest@localhost/")# Creating a channelchannel=awaitconnection.channel()# Declaring queuedeclare_ok=awaitchannel.queue_declare('rpc_queue')# Start listening the queue with name 'hello'awaitchannel.basic_consume(declare_ok.queue,on_message)loop=asyncio.get_event_loop()loop.create_task(main(loop))# we enter a never-ending loop that waits for data# and runs callbacks whenever necessary.print(" [x] Awaiting RPC requests")loop.run_forever()

RPC client

importasyncioimportuuidimportaiormqimportaiormq.typesclassFibonacciRpcClient:def__init__(self):self.connection=None# type: aiormq.Connectionself.channel=None# type: aiormq.Channelself.callback_queue=''self.futures={}self.loop=loopasyncdefconnect(self):self.connection=awaitaiormq.connect("amqp://guest:guest@localhost/")self.channel=awaitself.connection.channel()declare_ok=awaitself.channel.queue_declare(exclusive=True,auto_delete=True)awaitself.channel.basic_consume(declare_ok.queue,self.on_response)self.callback_queue=declare_ok.queuereturnselfasyncdefon_response(self,message:aiormq.types.DeliveredMessage):future=self.futures.pop(message.correlation_id)future.set_result(message.body)asyncdefcall(self,n):correlation_id=str(uuid.uuid4())future=loop.create_future()self.futures[correlation_id]=futureawaitself.channel.basic_publish(str(n).encode(),routing_key='rpc_queue',properties=aiormq.spec.Basic.Properties(content_type='text/plain',correlation_id=correlation_id,reply_to=self.callback_queue,))returnint(awaitfuture)asyncdefmain():fibonacci_rpc=awaitFibonacciRpcClient().connect()print(" [x] Requesting fib(30)")response=awaitfibonacci_rpc.call(30)print(" [.] Got %r"%response)loop=asyncio.get_event_loop()loop.run_until_complete(main())

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在读取属性文件时获取空指针   java NoSuchMethodError:org。springframework。靴子网状物servlet。错误错误控制器。最新SpringCloudStarter NetflixZuul中的getErrorPath()   java Spring不使用相同的JDBC连接   sqlite DB中带方括号的java数据   如何编译基于Maven的Java项目以从命令行运行它   java如何限制cowndown计时器的操作(例如登录)   java如何使用spring和springboot应用程序配置数据库?我想知道如何回答这类问题?   java中的buildpath不支持java。图书馆路径   java如何使用条目集在树映射上迭代?   java如何将IndexOf与Scanner结合使用?   xml Java SAX解析器进程监视   java在多台远程机器上运行并行junit测试   当我尝试在ListView中动态添加项时,单击按钮时java崩溃