用于Google云发布/订阅的Asyncio Python客户端

gcloud-aio-pubsub-fsg的Python项目详细描述


Latest PyPI VersionPython Version Support

安装

$ pip install --upgrade gcloud-aio-pubsub

使用

此发布/订阅实现基于google-cloud-pubsub >= 0.29.4

目前我们只实现了^{tt2}的异步版本$ 因为默认情况下,订阅模式不适用于asyncio。官员 googlepublisher返回的是一个基本上可用的未来。我们还没有 看到需要构建库的非asyncio线程安全版本 上游的谷歌图书馆已经很好地处理了这个问题。在

以下是订阅的大致使用模式:

^{pr2}$

配置

我们的create_subscription方法是一个thing包装器,因此支持all关键字 来自官方pubsub客户端的配置参数,您可以在 official Google documentation。在

订阅订阅时,可以选择传入FlowControl 和/或Scheduler实例。在

example_flow_control=FlowControl(max_messages=1,resume_threshold=0.8,max_request_batch_size=1,max_request_batch_latency=0.1,max_lease_duration=10,)keep_alive=client.subscribe('subscription_name',message_callback,flow_control=example_flow_control)

了解修改FlowControl如何影响pubsub运行时 会操作可能会令人困惑,所以这里有一个方便的花花公子指南!在

欢迎使用@TheKevJames的Google Pubsub订阅配置指南 政策!安顿下来,喝一杯,然后呆一会儿。在

订阅服务器由定义的FlowControl配置元组控制 here: 该配置对象f由以下订阅服务器使用 方法:

最大并发

订户可以在当前租用的任何时候租用新任务 任务x满足:

((len(x)<f.resume_threshold*f.max_messages)and(sum(x.bytes)<f.resume_threshold*f.max_bytes))

在实践中,这意味着我们应该使用以下内容设置这些值 限制条件:

  • 峰值并发租用任务的最大数量为: = (f.max_messages * f.resume_threshold) + f.max_request_batch_size
  • 我们租用任务在峰值时的最大内存使用量为: = (f.max_bytes * f.resume_threshold) + (f.max_request_batch_size * bytes_per_task)
  • 这些价值观是相互制约的,即我们把自己局限于较小的价值观 其中: max_tasks * bytes_per_task <> max_memory

旁白:似乎Pubsub上的ocn每个大约有1538个字节

租赁请求

租赁新任务时,Subscriber使用以下算法:

deflease_more_tasks():start=time.now()yieldqueue.Queue.get(block=True)# always returns >=1for_inrange(f.max_request_batch_size-1):elapsed=time.now()-startyieldqueue.Queue.get(block=False,timeout=f.max_request_batch_latency-elapsed)ifelapsed>=f.max_request_batch_latency:break

实际上,这意味着我们应该在给定 以上并发问题并设置f.max_request_batch_latencygiven 不管我们愿意接受的延迟比是多少。在

对于一个满队列Queue.get(),预期的最佳情况时间也没有变差 这应该比0.3mpc的请求快 谷歌Pubsub,它应该足够快(tm)来保持它的填充,给定 这些请求是批处理的。在

因此,我们可以预期:

  • 平均租约延迟:~= f.max_request_batch_size * 0.0003
  • 最坏情况延迟:~= f.max_request_batch_latency

注意,租赁是基于f.resume_threshold发生的,因此 延迟与任务执行同步。在

任务到期

任何尚未确认或取消确认的任务都将计入当前租用的任务 任务计数。我们的工作线程应该确保所有任务都被确认或取消,但是 FlowControlconfig允许我们处理任何其他情况。请注意 租赁工程如下:

  • 当订阅者租用一个任务时,googlepubsub不会再租赁它 任务到subscription.ack_deadline_seconds = 10(可配置 每个订阅)秒已过。在
  • 如果一个客户对一个任务调用ack(),它将立即从Google中删除 普布Sub。在
  • 如果一个客户端在一个任务上调用nack(),它将立即允许googlepubsub 将该任务重新租借给新客户机。客户端从其 记忆。在
  • 如果f.max_lease_duration在租用和确认的消息之间传递, 客户端将发送一个nack(请参阅上面的工作流)。它不会掉 任务从它的内存-例如,worker(task)进程可能仍在运行。在

注意事项:

  • 所有的步骤都是最好的努力,例如将“任务将被删除”改为“任务将被删除” 可能会被删除,如果分布式系统幸运的话”
  • 在上面的工作流中,“googlepubsub”指的是服务器端系统,例如。 由Google管理,任务是ac实际储存。在

实际上,我们应该将f.max_lease_duration设置为不低于 高负载下95%的任务延迟。这个值越低, 在极端情况下,我们的吞吐量会更好。在

混乱

已定义f.max_requests,但似乎未使用。在

贡献

请看我们的contributing guide。在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java通过HTTP GET请求将Web浏览器打开到URI   带睡眠线程的java暂停秒表计时器?   java程序在试图分析字符串时冻结   JavaJ2ME:如何从联系人列表导入联系人?   ApachePOIJava将html转换为pdf   基于Tweet长度的java文本过滤   java如何从xml文件中提取数据作为代码的输入   java SonarQube抱怨检查集合中的列表#包含<?扩展整数>   java轻松在所有活动按钮上设置setOnClickListener()   java使用JavaHg设置Mercurial选项   java Webdriver TestNG空指针异常   java使用NamedQueries查询加密列   运行java程序时出现Hibernate异常无限运行   java将ActionListener添加到自定义组件   java Log4J SQL日志TopLink   java使用selenium javascript上传文件错误   Java中静态块的执行   java为什么要使用MavenGPGPlugin用GnuPG签署项目的工件?   java使用截击与AndroidApp通信ESP8266