带有postgres配置的kafka avro二进制用户

kafka-avro-binary-consumer的Python项目详细描述


首先,我们需要启动Confluent Kafka服务器,使用本文,了解如何做到这一点: https://docs.confluent.io/3.0.0/control-center/docs/quickstart.html

本项目的目的是: -使用二进制avro,将其分成不同的主题

要安装此项目,您需要: -安装所有包,包文件夹中的内容 -设置Postgres服务器并执行create_config_tables.sql和insert_to_config_tables.sql文件 -将binary_avro_consumer.py和conf.cnf放在服务器上,用命令执行python文件 python3.6 binary_avro_consumer.py(参数)

有关控制台执行参数的详细信息:

create tables语句存储在create_config_tables.sql文件中。 insert in to config tables语句存储在insert_to_config_tables.sql文件中。 您应该执行create语句,并将您的设置插入到这些表中。 你有这样的表格结构:

  config_key         |  config_value
 --------------------+----------------


  topic_name  | field_name
--------------+-------------

下面是它们的填充示例:

--The config key, means the key of some setting, there an explanation of their meaning--

     bootstrap_server_from - The bootstrap server from what we have messages, can be multiple times in db, because of multiple bootstrap server, kafka is cluster.
     bootstrap_server_from_port - The port of those bootstrap servers, usually bootstrap servers have the same port.\
     schema_registry - The schema registry url, should starts with http:// or https://
     schema_registry_port - The schema registry port
     topic_read - The topic, from what we need to read messages, so this topic is from `bootstrap_server_from` server.
     group_id - Usually uses default name `example_avro`, this parameter required  for consuming
     bootstrap_server_to - The server to what we writes messages, what we read and modified in `bootstrap_server_from`
     bootstrap_server_to_port - The port of `bootstrap_server_to`
     from_beginning - start consuming from beginning 1 - true, 0 - false 
     count_messages_consume - count of messages, what consumes per one iteration

             config_key         |  config_value
    ----------------------------+----------------
     bootstrap_server_from      | localhost
     bootstrap_server_from_port | 9092
     schema_registry            | http://0.0.0.0
     schema_registry_port       | 8081
     topic_read                 | avro-test
     group_id                   | example_avro
     bootstrap_server_to        | localhost
     bootstrap_server_to_port   | 9092
     from_beginning             | 1
     count_messages_consume     | 100

 topic_name  | field_name
--------------+-------------
 first_topic  | uid
 first_topic  | somefield
 second_topic | options hel
 second_topic | options mel


For example, you have such avro schema:

  "namespace" : "my.com.ns",
  "name": "myrecord",
  "type" :  "record",
  "fields" : [
     {"name": "uid", "type": "int"},
     {"name": "somefield", "type": "string"},
     {"name": "options", "type": {
        "type": "array",
        "items": {
            "type": "record",
            "name": "lvl2_record",
            "fields": [
                {"name": "hel", "type": "string"},
                {"name": "mel", "type": "string"}
                 }
               ]
            }
        }
     }

  ]
}

You need to extract such values from this schema: 

uid, somefield, options->hel, options->mel, and you need to store this values in first_topic and second_topic, so for example, we store uid and somefield in first_topic, 
    and options->hel, options->mel in second_topic. options->hel, options->mel means that field hel is a child of options, the same for mel.

So we write to db: first_topic uid,somefield  , what means, plz store uid and somefield in first_topic, the same for second_topic.

如何理解avro模式实际存储在模式注册表中的位置? 给你一个答案:

假设您创建了一个名为test的主题,并将架构注册到架构注册表,以了解架构是什么,并跟踪该架构是否已更改,您需要: 若要在终端中执行此命令,架构注册表服务器应正常工作(而不是http://localhost:8081/subjects您应放置架构注册表url): curl-x获取http://localhost:8081/subjects

Output of curl: ["Kafka-value","Kafka-key","test-value"]

You see, that your test topic also created 'test-value' subject, so the schema what you need is
http://localhost:8081/subjects/test-value/versions/latest",
in some cases your schema can be situated by `Kafka-value` path, so your url will be:  http://localhost:8081/subjects/Kafka-value/versions/latest"

Change this url conf.cnf file, more about conf.cnf:

conf.cnf, is config for python script:

    On FIRST line is schema registry url.
    On SECOND line is DB name.
    On THIRD line is username.
    On fourth line is password.
    On sixth line is host.
On the seventh line is the number of avro messages to produce by AvroProducer.

在所有这些之后,您需要启动名为pushpop_complex_avro.py的脚本, 通过这样的命令python3.6 pushpop_complex_avro.py(可选的params-d-i-e,分别用于debug、info和error)。 脚本运行后,它将等待消息,因此您需要生成要读取主题的消息: Python3.6 avro_producer.py 请看,这些信息被分割并指向第一个和第二个主题。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java连接usb到uart设备到安卓设备>3.1   可以强制Php中的web应用程序与Java中的桌面应用程序一起工作吗?   java为什么自定义系统类加载器不工作?   数组在Java中解析具有多个分隔符的字符串   PMD Java 8德米特定律   JavaSpringMVC表单验证不适用于嵌套的复杂类型   让Eclipse Java组织导入以使用Google checkstyle   java Appium:无法创建新会话   java如何在数组中声明新字段   java如何解决“无法初始化类org.apache.cassandra.config.DatabaseDescriptor”?   java AsyncTask创建socket   java向@CreatedBy添加更多信息   如何在ubuntu中运行包含大量jars依赖项的java文件   java如何使用<s:select>标记并在中休眠来填充下拉列表?   java获取错误:找不到符号变量“level”和“next_level_button”   javaweb应用中基于UI的ajax显示代码流   Java长到MySql   java JvisualVM:奇怪的应用程序行为   ubuntu将Java程序的输出结果保存到一个文件中