apache kafka的twisted python客户端
afkak的Python项目详细描述
afkak是一个Twisted-nativeApache Kafka客户端库。 它支持:
- 生成消息,具有自动批处理和可选压缩。
- 使用消息,具有组协调和自动提交功能。
在documentation中了解更多信息,下载from PyPI,或查看contribution guidelines。 请报告任何问题on GitHub。
状态
afkak支持这些Python:
- cpython 2.7
- cpython 3.5、3.6和3.7(在afkak 3.0.0及更高版本中)
- Pypy和Pypy3 6.0+
我们的目标是支持卡夫卡1.1.x及以后版本。 集成测试针对这些Kafka Broker版本运行:
- 0.9.0.1
- 1.1.1
计划对2.0.0进行测试(请参见#45)。
较新的代理版本通常会正常工作,但并非所有的afkak功能都能在较旧的代理上工作。 特别是,协调消费者在kafka 0.9.0.1之前不会工作。 我们不建议部署这样的旧版本,因为它们有严重的错误。
用法
高水平
注意:此代码不是可运行的。见producer_example 和consumer_example用于可运行的示例代码。
fromafkak.clientimportKafkaClientfromafkak.consumerimportConsumerfromafkak.producerimportProducerfromafkak.commonimport(OFFSET_EARLIEST,PRODUCER_ACK_ALL_REPLICAS,PRODUCER_ACK_LOCAL_WRITE)kClient=KafkaClient("localhost:9092")# To send messagesproducer=Producer(kClient)d1=producer.send_messages("my-topic",msgs=[b"some message"])d2=producer.send_messages("my-topic",msgs=[b"takes a list",b"of messages"])# To get confirmations/errors on the sends, add callbacks to the returned deferredsd1.addCallbacks(handleResponses,handleErrors)# To wait for acknowledgements# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to# a local log before sending response# [ the default ]# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed# by all in sync replicas before sending a responseproducer=Producer(kClient,req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,ack_timeout=2000)responseD=producer.send_messages("my-topic",msgs=[b"message"])# Using twisted's @inlineCallbacks:responses=yieldresponseDifresponse:print(response[0].error)print(response[0].offset)# To send messages in batch: You can use a producer with any of the# partitioners for doing this. The following producer will collect# messages in batch and send them to Kafka after 20 messages are# collected or every 60 seconds (whichever comes first). You can# also batch by number of bytes.# Notes:# * If the producer dies before the messages are sent, the caller would# * not have had the callbacks called on the send_messages() returned# * deferreds, and so can retry.# * Calling producer.stop() before the messages are sent will# errback() the deferred(s) returned from the send_messages call(s)producer=Producer(kClient,batch_send=True,batch_send_every_n=20,batch_send_every_t=60)responseD1=producer.send_messages("my-topic",msgs=[b"message"])responseD2=producer.send_messages("my-topic",msgs=[b"message 2"])# To consume messages# define a function which takes a list of messages to process and# possibly returns a deferred which fires when the processing is# complete.defprocessor_func(consumer,messages):# Store_Messages_In_Database may return a deferredresult=store_messages_in_database(messages)# record last processed messageconsumer.commit()returnresultthe_partition=3# Consume only from partition 3.consumer=Consumer(kClient,"my-topic",the_partition,processor_func)d=consumer.start(OFFSET_EARLIEST)# Start reading at earliest message# The deferred returned by consumer.start() will fire when an error# occurs that can't handled by the consumer, or when consumer.stop()# is calledyielddconsumer.stop()kClient.close()
按键信息
fromafkak.clientimportKafkaClientfromafkak.producerimportProducerfromafkak.partitionerimportHashedPartitioner,RoundRobinPartitionerkafka=KafkaClient("localhost:9092")# Use the HashedPartitioner so that the producer will use the optional key# argument on send_messages()producer=Producer(kafka,partitioner_class=HashedPartitioner)producer.send_messages("my-topic","key1",[b"some message"])producer.send_messages("my-topic","key2",[b"this method"])
低水平
fromafkak.clientimportKafkaClientkafka=KafkaClient("localhost:9092")req=ProduceRequest(topic="my-topic",partition=1,messages=[KafkaProtocol.encode_message(b"some message")])resps=afkak.send_produce_request(payloads=[req],fail_on_error=True)kafka.close()resps[0].topic# b"my-topic"resps[0].partition# 1resps[0].error# 0 (hopefully)resps[0].offset# offset of the first message sent in this request
安装
afkak版本是available on PyPI。
由于afkak依赖项Twisted和python-snappy具有二进制扩展模块,因此需要为要使用的解释器安装python开发头:
Debian/Ubuntu: | ^{ |
OS X | ^{ ^{ |
那么afkak可以是installed with pip as usual:
许可证
版权所有2013、2014、2015 David Arthur,阿帕奇许可,v2.0。见LICENSE
版权所有2014、2015青色股份有限公司,阿帕奇许可,v2.0。见LICENSE
版权所有2015、2016、2017、2018、2019 Ciena Corporation,阿帕奇许可,v2.0。见LICENSE
这个项目最初是作为kafka-python库的一个端口来twisted的。
有关完整的参与者列表,请参见AUTHORS.md。