java读取来自Kafka主题的消息并将其转储到HDFS中
我试图使用卡夫卡主题中的数据,将其加载到数据集,然后在加载到Hdfs之前执行筛选
我可以使用卡夫卡主题,将其加载到数据集中,并在HDFS中另存为拼花地板文件,但无法执行过滤条件。您能分享一下在保存到hdfs之前执行过滤的方法吗? 我正在使用Java和Spark来使用卡夫卡主题。 我的部分代码如下:
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query = ds.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "<your path>")
.trigger(Trigger.Once())
.start();
# 1 楼答案
在
coalesce
之前写入过滤器逻辑,即ds.filter().coalesce()
# 2 楼答案
与其重新发明轮子,我强烈建议Kafka Connect。 您只需要HDFS接收器连接器,它可以将卡夫卡主题中的数据复制到HDFS