用于与apache kafka交互的python实用程序
ctodd-python-lib-kafka的Python项目详细描述
Christopher H.Todd与Kafka交互的Python库
ctodd python lib kafka项目负责与apache kafka交互。这包括从主题中生成和使用记录、使用.avro格式以及使用python创建事件驱动应用程序的其他任务。
目录
依赖关系
python包
- 合流kafka==0.11.6
- simplejson==3.16.0
库
kafka_admin_helpers.py
此库用于与kafka管理功能交互。这个 包括获取将返回有关kafka的详细信息的管理对象 国家。
功能:
def get_kafka_admin_client(kafka_brokers):
"""
Purpose:
Get a Kafka Admin Client Object. Allows for polling information about Kafka
configuration and creating objects in Kafka
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa
brokers
Return:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
"""
kafka_consumer_helpers.py
这个库用于帮助创建卡夫卡消费者。
功能:
def get_kafka_consumer(
kafka_brokers,
consumer_group="default",
timeout=6000,
offset_start="latest",
get_stats=True
):
"""
Purpose:
Get a Kafka Consumer Object (not yet connected to a topic)
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
consumer_group (String): Consumer group to consume as. default is "default"
timeout (String): Timeout in ms if no messages are found (during poll). Default
is 6000
offset_start (String): Where to start consuming with respect to the consumer
group/topic offset. Default is "latest", which ignores any messages in the
topic before the consumer begins consuming
get_stats (Bool): Whether or not to print statistics. Default is True
Return:
kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
"""
def consume_topic(kafka_consumer, kafka_topics):
"""
Purpose:
Consume Kafka Topics
Args:
kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
kafka_topics (List of Strings): List of Kafka Topics to Consume.
Yields:
msg (Kafka Message Obj): Message Obj returned from the topic
"""
kafka_exceptions.py
保存将由kafka_helpers库生成的自定义异常类型的文件
课程:
class TopicNotFound(Exception):
"""
Purpose:
The TopicNotFound will be raised when attempting to consume a topic that
does not exist
"""
kafka_general_helpers.py
此库用于与卡夫卡交互,而不是与使用或生成消息相关
功能:
不适用
kafka_producer_helpers.py
这个库用于帮助创建卡夫卡制作者。
功能:
def get_kafka_producer(kafka_brokers, get_stats=True):
"""
Purpose:
Get a Kafka Producer Object (not yet connected to a topic)
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
get_stats (Bool): Whether or not to print statistics. Default is True
Return:
kafka_producer (Kafka Producer Obj): Kafka Producer Object
"""
def produce_message(kafka_producer, kafka_topic, msg):
"""
Purpose:
Consume Kafka Topics
Args:
kafka_producer (Kafka Producer Obj): Kafka Producer Object
kafka_topic (String): Kafka Topic to Produce message to.
msg (String): Message to produce to Kafka
Returns:
N/A
"""
def produce_results_callback(err, msg):
"""
Purpose:
Optional per-message delivery callback (triggered by poll() or
flush()) when a message has been successfully delivered or
permanently failed delivery (after retries).
Args:
err (String): Error Message
msg (Object): Kafka Callback Message Object
Return:
N/A
"""
kafka_topic_helpers.py
此库用于与卡夫卡主题交互。这包括 主题列表,查找主题的详细信息,创建主题,以及 更多。
功能:
def get_topics(kafka_admin_client, return_system_topics=False):
"""
Purpose:
Get a List of Kafka Topics.
Args:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
Return:
kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
Kafka metadata object that has basic topic information
"""
def create_kafka_topic(
kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
"""
Purpose:
Create a Kafka Topic
Args:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
topic_name (String): Name of the topic to create
topic_replication (Int): Replication factor for the new topic
topic_partitions (Int): Number of partitions to devide the topic into
Return:
N/A
"""
脚本示例
用于测试和与库交互的示例可执行python脚本/模块。这些示例显示了库的用例,可以用作与库一起开发的模板,也可以用作一次性开发工作。
consume_from_kafka_topic.py
Purpose:
Consume from a Kafka Topic
Steps:
- Connect to Kafka
- Create Consumer Object
- Poll Topic
- Parse Message
- Print Message
example script call:
python3 consume_from_kafka_topic.py --topic="test-env-topic" \
--broker="0.0.0.0:9092" --consumer-group="test-env-consumer"
produce_to_kafka_topic.py
Purpose:
Produce to a Kafka Topic
Steps:
- Connect to Kafka
- Create Producer Object
- Prompt for Input
- Parse Input
- Produce Input to Kafka
example script call:
python3 produce_to_kafka_topic.py --topic="test-env-topic" \
--broker="localhost:9092"
create_kakfa_topic.py
Purpose:
Create a Kafka Topic. Takes in replication and parition information
Steps:
- Connect to Kafka
- Create Kafka Admin Client
- Create Topic In Kafka
function call:
---
example script call:
python3 create_kafka_topic.py --topic-name="test-env-topic" \
--topic-replication=3 --topic-partitions=4 \
--broker="localhost:9092"
注释
- 依赖于f-string符号,它仅限于python3.6。通过重构删除这些内容,可以使用python3.0.x到3.5.x进行开发
待办事项
- UnitTest框架已就位,但缺少测试