有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java读取来自Kafka主题的消息并将其转储到HDFS中

我试图使用卡夫卡主题中的数据,将其加载到数据集,然后在加载到Hdfs之前执行筛选

我可以使用卡夫卡主题,将其加载到数据集中,并在HDFS中另存为拼花地板文件,但无法执行过滤条件。您能分享一下在保存到hdfs之前执行过滤的方法吗? 我正在使用Java和Spark来使用卡夫卡主题。 我的部分代码如下:

DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = ds.coalesce(10)
                .writeStream()
                .format("parquet")
                .option("path", path.toString())
                .option("checkpointLocation", "<your path>")
                .trigger(Trigger.Once())
                .start();

共 (2) 个答案

  1. # 1 楼答案

    coalesce之前写入过滤器逻辑,即ds.filter().coalesce()

    
    DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
    
     ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
    
    StreamingQuery query = 
                    ds
                    .filter(...) // Write your filter condition here
                    .coalesce(10)
                    .writeStream()
                    .format("parquet")
                    .option("path", path.toString())
                    .option("checkpointLocation", "<your path>")
                    .trigger(Trigger.Once())
                    .start();