一个完整的,基于gevent的,非tornado nsq客户端。
nsqs的Python项目详细描述
该项目正在积极开发中,文档正在演变为 单个零件。
这个项目封装了连接管理、心跳管理和 将传入消息(针对使用者)分派给处理程序。
功能
- 全功能:
- 快速压缩
- 压缩放气
- TLS压缩
- 通过TLS进行客户端(“相互”)身份验证
- 我们依赖于消费者定义的“分类”函数来确定 传入消息的处理程序的名称。这允许事件驱动 消费。这意味着最终用户的锅炉板要少一些。 RDY管理的复杂性是由图书馆自动管理的。 这些参数可以重新配置,但是nsqs强调简单性和 直觉,这样你就不必参与机械 想。
- 标识参数可以直接指定,但许多参数是管理的 根据生产者/消费者的参数自动设置。
- 邮件处理后在服务器上标记为“已完成” 除非我们被配置成不。
- 对于消费者,您可以指定主题和频道对联的列表,以及 将连接到每个服务器并根据每个服务器进行订阅。 如果使用查找服务器,则会为每个 列表中的主题(如果没有查找服务器,则假定所有服务器 支持所有主题)。
实现消费者
进口和样板:
import logging import json import gevent import nsq.consumer import nsq.node_collection import nsq.message_handler _logger = logging.getLogger(__name__)
创建消息处理程序:
class _MessageHandler(nsq.message_handler.MessageHandler): def __init__(self, *args, **kwargs): super(_MessageHandler, self).__init__(*args, **kwargs) self.__processed = 0 def message_received(self, connection, message): super(_MessageHandler, self).message_received(connection, message) try: self.__decoded = json.loads(message.body) except: _logger.info("Couldn't decode message. Finished: [%s]", message.body) return def classify_message(self, message): return (self.__decoded['type'], self.__decoded) def handle_dummy(self, connection, message, context): self.__processed += 1 if self.__processed % 1000 == 0: _logger.info("Processed (%d) messages.", self.__processed) def default_message_handler(self, message_class, connection, message, classify_context): _logger.warning("Squashing unhandled message: [%s] [%s]", message_class, message)
定义节点集合。我们在这里使用nsqlookupd服务器,但是我们可以 与nsqd服务器一起使用servernodes()一样简单:
lookup_node_prefixes = [ 'http://127.0.0.1:4161', ] nc = nsq.node_collection.LookupNodes(lookup_node_prefixes)
创建使用者对象:
_TOPIC = 'test_topic' _CHANNEL = 'test_channel' _MAX_IN_FLIGHT = 500 c = nsq.consumer.Consumer( [(_TOPIC, _CHANNEL)], nc, _MAX_IN_FLIGHT, message_handler_cls=_MessageHandler)
启动消费者:
c.start()
循环。例如,只要我们至少连接到一个 服务器:
while c.is_alive: gevent.sleep(1)
实现生产者
进口和样板:
import logging import json import random import nsq.producer import nsq.node_collection import nsq.message_handler _logger = logging.getLogger(__name__)
定义节点集合。这是一个生产者,因此它只与nsqd一起工作。 节点:
server_nodes = [ ('127.0.0.1', 4150), ] nc = nsq.node_collection.ServerNodes(server_nodes)
创建producer对象:
_TOPIC = 'test_topic' p = nsq.producer.Producer(_TOPIC, nc)
启动制作程序:
p.start()
发出消息:
for i in range(0, 100000, 10): if i % 50 == 0: _logger.info("(%d) messages published.", i) data = { 'type': 'dummy', 'data': random.random(), 'index': i } message = json.dumps(data) p.mpublish((message,) * 10)
停止制作:
p.stop()
回调
消费者和生产者都可以接受回调对象。
实例化producer的回调:
import nsq.connection_callbacks cc = nsq.connection_callbacks.ConnectionCallbacks()
实例化consumer的回调:
import nsq.consumer cc = nsq.consumer.ConsumerCallbacks()
然后,将对象作为 ccallbacks。
以下回调方法可以为生产者或 消费者(同时确保调用原始实现):
连接(连接)
连接已建立。
识别(连接)
已处理此连接的标识响应。
断开(连接)
连接已断开。
收到的消息(连接,消息)
已收到消息。
consumer有一个附加回调:
rdy\u补充(连接、当前、原始)
RDY需要更新。默认情况下,将重新提交原始RDY。 如果不需要,请重写此回调,不要调用原始回调。
脚注
- 因为我们依赖gevent,而gevent不是 python3兼容,nsqs不兼容python3。