用于与apache kafka交互的python实用程序

ctodd-python-lib-kafka的Python项目详细描述


Christopher H.Todd与Kafka交互的Python库

ctodd python lib kafka项目负责与apache kafka交互。这包括从主题中生成和使用记录、使用.avro格式以及使用python创建事件驱动应用程序的其他任务。

目录

依赖关系

python包

  • 合流kafka==0.11.6
  • simplejson==3.16.0

kafka_admin_helpers.py

此库用于与kafka管理功能交互。这个 包括获取将返回有关kafka的详细信息的管理对象 国家。

功能:

def get_kafka_admin_client(kafka_brokers):
    """
    Purpose:
        Get a Kafka Admin Client Object. Allows for polling information about Kafka
        configuration and creating objects in Kafka
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa
            brokers
    Return:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    """

kafka_consumer_helpers.py

这个库用于帮助创建卡夫卡消费者。

功能:

def get_kafka_consumer(
    kafka_brokers,
    consumer_group="default",
    timeout=6000,
    offset_start="latest",
    get_stats=True
):
    """
    Purpose:
        Get a Kafka Consumer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        consumer_group (String): Consumer group to consume as. default is "default"
        timeout (String): Timeout in ms if no messages are found (during poll). Default
            is 6000
        offset_start (String): Where to start consuming with respect to the consumer
            group/topic offset. Default is "latest", which ignores any messages in the
            topic before the consumer begins consuming
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
    """
def consume_topic(kafka_consumer, kafka_topics):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
        kafka_topics (List of Strings): List of Kafka Topics to Consume.
    Yields:
        msg (Kafka Message Obj): Message Obj returned from the topic
    """

kafka_exceptions.py

保存将由kafka_helpers库生成的自定义异常类型的文件

课程:

class TopicNotFound(Exception):
    """
    Purpose:
        The TopicNotFound will be raised when attempting to consume a topic that
        does not exist
    """

kafka_general_helpers.py

此库用于与卡夫卡交互,而不是与使用或生成消息相关

功能:

不适用

kafka_producer_helpers.py

这个库用于帮助创建卡夫卡制作者。

功能:

def get_kafka_producer(kafka_brokers, get_stats=True):
    """
    Purpose:
        Get a Kafka Producer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
    """
def produce_message(kafka_producer, kafka_topic, msg):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
        kafka_topic (String): Kafka Topic to Produce message to.
        msg (String): Message to produce to Kafka
    Returns:
        N/A
    """
def produce_results_callback(err, msg):
    """
    Purpose:
        Optional per-message delivery callback (triggered by poll() or
        flush()) when a message has been successfully delivered or
        permanently failed delivery (after retries).
    Args:
        err (String): Error Message
        msg (Object): Kafka Callback Message Object
    Return:
        N/A
    """

kafka_topic_helpers.py

此库用于与卡夫卡主题交互。这包括 主题列表,查找主题的详细信息,创建主题,以及 更多。

功能:

def get_topics(kafka_admin_client, return_system_topics=False):
    """
    Purpose:
        Get a List of Kafka Topics.
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    Return:
        kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
            Kafka metadata object that has basic topic information
    """
def create_kafka_topic(
    kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
    """
    Purpose:
        Create a Kafka Topic
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
        topic_name (String): Name of the topic to create
        topic_replication (Int): Replication factor for the new topic
        topic_partitions (Int): Number of partitions to devide the topic into
    Return:
        N/A
    """

脚本示例

用于测试和与库交互的示例可执行python脚本/模块。这些示例显示了库的用例,可以用作与库一起开发的模板,也可以用作一次性开发工作。

consume_from_kafka_topic.py

    Purpose:
        Consume from a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Consumer Object
        - Poll Topic
        - Parse Message
        - Print Message

    example script call:
        python3 consume_from_kafka_topic.py --topic="test-env-topic" \
            --broker="0.0.0.0:9092" --consumer-group="test-env-consumer"

produce_to_kafka_topic.py

    Purpose:
        Produce to a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Producer Object
        - Prompt for Input
        - Parse Input
        - Produce Input to Kafka

    example script call:
        python3 produce_to_kafka_topic.py --topic="test-env-topic" \
            --broker="localhost:9092"

create_kakfa_topic.py

    Purpose:
        Create a Kafka Topic. Takes in replication and parition information

    Steps:
        - Connect to Kafka
        - Create Kafka Admin Client
        - Create Topic In Kafka

    function call:
        ---
    example script call:
        python3 create_kafka_topic.py --topic-name="test-env-topic" \
            --topic-replication=3 --topic-partitions=4 \
            --broker="localhost:9092"

注释

  • 依赖于f-string符号,它仅限于python3.6。通过重构删除这些内容,可以使用python3.0.x到3.5.x进行开发

待办事项

  • UnitTest框架已就位,但缺少测试

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何修复使用高停止条件时的StackOverflowerError   java两个非常好的int被除掉,仍然返回0   java将SpringWeb应用程序(Web.xml)迁移到Springboot 1.5.10   java使用CQL查询的结果集检索整行   java Solr 7:当某些请求命中Solr时,QueuedThreadPool线程数很高   在ActionListener中访问另一个类时发生java NullPointerException   异常处理Java重构类似方法的代码   java Hi我需要帮助在我的JSP页面中传输图像   Android中的java工作线程   覆盖字段值的JavaDB模型策略   带有resteasy的java Spring引导“找不到名为requestMappingHandlerMapping的bean的类型”错误   java如何插入(int)和(date)类型?   Java Swing计时器和ActionEvent   java运行一个没有jUnit作为运行选项的类   java通过解析异常来获取方法名及其包含的参数   与枚举匹配的java Get-from列表元素   我的程序中出现java内存不足错误   java在C中创建jobject不起作用   如何在java中测试这个void方法?