用于Liftbridge的Python客户端。

liftclient的Python项目详细描述


Python升降桥

PyPIGitHub

此项目正在开发中。

Python客户机,Liftbridge,一个为NATS提供轻量级、容错消息流的系统。在

Liftbridge提供以下高级功能:

  • 基于日志的NAT API
  • 复制以实现容错
  • 水平扩展
  • 通配符订阅支持
  • 至少一次传递支持和消息重播
  • 消息键值支持
  • 按键压缩日志

安装

$ pip install python-liftbridge

基本用途

^{pr2}$

创建流

Streams是附加到NATS主题的持久消息日志。他们记录下发布给受试者的信息以供消费。在

流有几个关键属性:subject,它是对应的NATS主题,name是流的可读标识符,以及replication factor,它是为了冗余而应该将流复制到的节点数。可选地,有一个组,它是要加入的流的负载平衡组的名称。当同一组中有多个流时,消息将在其中进行平衡。在

"""    Create a stream attached to the NATS subject "foo.*" that is replicated to    all the brokers in the cluster. ErrStreamExists is returned if a stream with    the given name already exists for the subject."""client.create_stream(Stream(subject='foo.*',name='my-stream',max_replication=True))

订阅开始/重播选项

Subscriptions是提升桥流的消耗方式。客户端可以选择从何处开始使用流中的消息。这是使用传递给Subscribe的选项来控制的。在

# Subscribe starting with new messages only.client.subscribe(Stream(subject='foo',name='foo-stream'))# Subscribe starting with the most recently published value.client.subscribe(Stream(subject='foo',name='foo-stream').start_at_earliest_received())# Subscribe starting with the oldest published value.client.subscribe(Stream(subject='foo',name='foo-stream').start_at_latest_received())# Subscribe starting at a specific offset.client.subscribe(Stream(subject='foo',name='foo-stream').start_at_offset(4))# Subscribe starting at a specific time.client.subscribe(Stream(subject='foo',name='foo-stream').start_at_time(datetime.now()))# Subscribe starting at a specific amount of time in the past.client.subscribe(Stream(subject='foo',name='foo-stream').start_at_time_delta(timedelta(days=1)))

发布

提供了一个发布API,以便于将消息写入流。这包括许多选项,用于用消息键等元数据装饰消息。在

键用于Liftbridge的原木压缩。启用后,提升桥流将只保留给定密钥的最后一条消息。在

# Publish a message with a keyclient.publish(Message(subject='foo',value='Hello',key='key'))

直接使用NAT发布

由于Liftbridge是NATS的扩展,因此NATS client也可用于发布消息。这意味着现有的NATS发布者不需要对Liftbridge中的消息进行任何更改。在

如何作出贡献

  1. 检查是否有未解决的问题,或者打开一个新的问题,围绕某个特性想法或bug展开讨论。在
  2. 在GitHub上分叉the repository,开始对主分支(或其分支)进行更改。在
  3. 编写一个测试,测试该错误是否已修复,或者该功能是否按预期工作。在
  4. 发送一个pull request和bug me,直到合并并发布为止。在

待办事项:

  • []添加文档(Sphynx)
  • []添加CI(CircleCI或TravisCI)
  • []添加测试
  • 代码覆盖率添加[]
  • []添加对gRPC的TLS支持
  • []添加邮件头支持
  • []添加消息确认支持(脚手架已经完成)
  • [x] 关闭连接方法
  • []添加异步客户端
  • []添加gRPC连接池
  • [x] 添加日志记录(并删除所有随机打印)
  • 添加适当的字符串[]
  • [x] 添加版本文件
  • []添加贡献.md以及工作流程的说明(pyenv、tox、make、pre-commit…)
  • []改进获取元数据
  • []改进错误处理
  • [x] 使用Docker添加到makefile run liftbridge中container
  • []更好的仪器/观测能力(OpenCensus支持?)在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Android:如何写入特定行,Java   Java中从欧元货币字符串中删除空格的数字   Java非均匀多维数组   解密AES时出现java空指针异常   java ConcurrentModificationException尝试移除列表上的所有内容时(非迭代)   Java数学库计算日志   java ISO8601,使用Jackson以毫秒表示json   避免副作用的java最佳实践   java获取JMeterException:调用bsh方法时出错:未定义参数:saa。使用beanshell取样器时   使用javascript将会话从一个jsp页面传输到另一个jsp页面   java在列表中组合相邻元素   java多行JTextPane   java Hibernate映射文件连接两个表而不定义关系?   如何使用Ajax、Java和Spring框架将文件从网页上传到Google云存储   多线程多线程Java中producerconsumer代码的多线程没有提供正确的输出?