apache kafka的twisted python客户端

afkak的Python项目详细描述


afkak是一个Twisted-nativeApache Kafka客户端库。 它支持:

  • 生成消息,具有自动批处理和可选压缩。
  • 使用消息,具有组协调和自动提交功能。

documentation中了解更多信息,下载from PyPI,或查看contribution guidelines。 请报告任何问题on GitHub

状态

afkak支持这些Python:

  • cpython 2.7
  • cpython 3.5、3.6和3.7(在afkak 3.0.0及更高版本中)
  • Pypy和Pypy3 6.0+

我们的目标是支持卡夫卡1.1.x及以后版本。 集成测试针对这些Kafka Broker版本运行:

  • 0.9.0.1
  • 1.1.1

计划对2.0.0进行测试(请参见#45)。

较新的代理版本通常会正常工作,但并非所有的afkak功能都能在较旧的代理上工作。 特别是,协调消费者在kafka 0.9.0.1之前不会工作。 我们不建议部署这样的旧版本,因为它们有严重的错误。

用法

高水平

注意:此代码不是可运行的。见producer_exampleconsumer_example用于可运行的示例代码。

fromafkak.clientimportKafkaClientfromafkak.consumerimportConsumerfromafkak.producerimportProducerfromafkak.commonimport(OFFSET_EARLIEST,PRODUCER_ACK_ALL_REPLICAS,PRODUCER_ACK_LOCAL_WRITE)kClient=KafkaClient("localhost:9092")# To send messagesproducer=Producer(kClient)d1=producer.send_messages("my-topic",msgs=[b"some message"])d2=producer.send_messages("my-topic",msgs=[b"takes a list",b"of messages"])# To get confirmations/errors on the sends, add callbacks to the returned deferredsd1.addCallbacks(handleResponses,handleErrors)# To wait for acknowledgements# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to#                         a local log before sending response# [ the default ]# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed#                            by all in sync replicas before sending a responseproducer=Producer(kClient,req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,ack_timeout=2000)responseD=producer.send_messages("my-topic",msgs=[b"message"])# Using twisted's @inlineCallbacks:responses=yieldresponseDifresponse:print(response[0].error)print(response[0].offset)# To send messages in batch: You can use a producer with any of the# partitioners for doing this. The following producer will collect# messages in batch and send them to Kafka after 20 messages are# collected or every 60 seconds (whichever comes first). You can# also batch by number of bytes.# Notes:# * If the producer dies before the messages are sent, the caller would# * not have had the callbacks called on the send_messages() returned# * deferreds, and so can retry.# * Calling producer.stop() before the messages are sent will# errback() the deferred(s) returned from the send_messages call(s)producer=Producer(kClient,batch_send=True,batch_send_every_n=20,batch_send_every_t=60)responseD1=producer.send_messages("my-topic",msgs=[b"message"])responseD2=producer.send_messages("my-topic",msgs=[b"message 2"])# To consume messages# define a function which takes a list of messages to process and# possibly returns a deferred which fires when the processing is# complete.defprocessor_func(consumer,messages):#  Store_Messages_In_Database may return a deferredresult=store_messages_in_database(messages)# record last processed messageconsumer.commit()returnresultthe_partition=3# Consume only from partition 3.consumer=Consumer(kClient,"my-topic",the_partition,processor_func)d=consumer.start(OFFSET_EARLIEST)# Start reading at earliest message# The deferred returned by consumer.start() will fire when an error# occurs that can't handled by the consumer, or when consumer.stop()# is calledyielddconsumer.stop()kClient.close()

按键信息
fromafkak.clientimportKafkaClientfromafkak.producerimportProducerfromafkak.partitionerimportHashedPartitioner,RoundRobinPartitionerkafka=KafkaClient("localhost:9092")# Use the HashedPartitioner so that the producer will use the optional key# argument on send_messages()producer=Producer(kafka,partitioner_class=HashedPartitioner)producer.send_messages("my-topic","key1",[b"some message"])producer.send_messages("my-topic","key2",[b"this method"])

低水平

fromafkak.clientimportKafkaClientkafka=KafkaClient("localhost:9092")req=ProduceRequest(topic="my-topic",partition=1,messages=[KafkaProtocol.encode_message(b"some message")])resps=afkak.send_produce_request(payloads=[req],fail_on_error=True)kafka.close()resps[0].topic# b"my-topic"resps[0].partition# 1resps[0].error# 0 (hopefully)resps[0].offset# offset of the first message sent in this request

安装

afkak版本是available on PyPI

由于afkak依赖项Twistedpython-snappy具有二进制扩展模块,因此需要为要使用的解释器安装python开发头:

Debian/Ubuntu: ^{}
OS X ^{}
^{}

那么afkak可以是installed with pip as usual

许可证

版权所有2013、2014、2015 David Arthur,阿帕奇许可,v2.0。见LICENSE

版权所有2014、2015青色股份有限公司,阿帕奇许可,v2.0。见LICENSE

版权所有2015、2016、2017、2018、2019 Ciena Corporation,阿帕奇许可,v2.0。见LICENSE

这个项目最初是作为kafka-python库的一个端口来twisted的。

有关完整的参与者列表,请参见AUTHORS.md

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
初始化非静态成员:C++与java比较   Java/Spring在线程关闭后为多租户程序获取租户?   java为什么使用WebMVCAutoConfiguration适配器类   java Jsoup将元素转换为TextNode会导致异常   java线程概念中的多线程同步   乔达皈依。时间DateTime到java。sql。日期和时区   java线程检查问题   java JavaFX显示列表的视图是什么类型的?   java Map UI组件未显示在Cuba平台中   java neo4j示例“MyRestaurantSocial”错误   java在Android中定制键盘“回车”按钮   while循环和打印列表中的Java减量变量   即使相同的处理程序正常工作,也会引发java空指针异常