有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用KSTREAM或KSQL将java JSON数组转换为JSON对象

我有以下格式的数据输入卡夫卡

{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

我想把它变成这样

{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}

我试图使用ksql实现扁平化,但ksql还不支持数组

我试着用下面的代码使用kstream将其展平

builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);

但它没有产生任何产出。对此,我们将不胜感激


共 (1) 个答案

  1. # 1 楼答案

    KSQL/ksqlDB确实支持阵列。下面是如何用它来实现你的要求:

      Declare the stream
    CREATE STREAM TEST1 
        (WHS ARRAY<STRUCT<"action"           VARCHAR
                        , "Update-Date-Time" VARCHAR
                        , "Number"           VARCHAR
                        , "Abbr"             VARCHAR
                        , "Name"             VARCHAR
                        , "Name2"            VARCHAR
                        , "Country-Code"     VARCHAR
                        , "Addr-1"           VARCHAR
                        , "Addr-2"           VARCHAR
                        , "Addr-4"           VARCHAR
                        , "City"             VARCHAR
                        , "State"            VARCHAR>>) 
        WITH (KAFKA_TOPIC ='test1'
             ,VALUE_FORMAT='JSON');
    
      Set querying from beginning of the topic
    SET 'auto.offset.reset' = 'earliest';
    
      Query the array         
    ksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;
    +                                                                           +
    |WHS                                                                                                                                                   |
    +                                                                           +
    |[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |
    Limit Reached
    Query terminated
    ksql>         
    
      Flatten the array
    ksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;
    +                                                                                                                                            -+
    |KSQL_COL_0                                                                                                                                                                                                                                                                               |
    +                                                                                                                                            -+
    |{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}                                                                                                                                      |
    Limit Reached
    Query terminated
    ksql>
    

    您可以将其写入另一个流(主题):

    ksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC='NEW_TEST1') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;
    
     Message
                         -
     Created query with ID CSAS_TEST1_EXPLODE_155
                         -
    ksql> PRINT NEW_TEST1;
    …
    Value format: JSON or KAFKA_STRING
    rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}
    

    如果要展平生成的结构,也可以这样做:

    CREATE STREAM TEST1_FLATTENED AS SELECT  EXPLODE(WHS)->"action"           AS "action"           ,
            EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,
            EXPLODE(WHS)->"Number"           AS "Number"           ,
            EXPLODE(WHS)->"Abbr"             AS "Abbr"             ,
            EXPLODE(WHS)->"Name"             AS "Name"             ,
            EXPLODE(WHS)->"Name2"            AS "Name2"            ,
            EXPLODE(WHS)->"Country-Code"     AS "Country-Code"     ,
            EXPLODE(WHS)->"Addr-1"           AS "Addr-1"           ,
            EXPLODE(WHS)->"Addr-2"           AS "Addr-2"           ,
            EXPLODE(WHS)->"Addr-4"           AS "Addr-4"           ,
            EXPLODE(WHS)->"City"             AS "City"             ,
            EXPLODE(WHS)->"State"            AS "State"
        FROM TEST1 EMIT CHANGES;
    
    ksql> PRINT TEST1_FLATTENED;
    …
    Value format: JSON or KAFKA_STRING
    rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}