如何使用pyspark读取hdfs-kafka数据?

2024-09-29 21:30:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试读取通过Kafka和SparkStreaming存储到HDFS的数据。在

我使用的是一个Java应用程序,它使用JavaRDD.saveAsTextFile文件方法。基本上是这样的:

kafkaStreams.get(i).foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
                        consumerRecordJavaRDD.saveAsTextFile("/tmp/abcd_" + System.currentTimeMillis());
            });

一个文本文件行被推过卡夫卡。数据被保存,我可以在默认的hadoop浏览器中看到它本地主机:50070。在

然后,在pyspark应用程序中,我尝试使用sparkContext.text文件. 在

问题是我读取的数据(使用python或“手工”在本地主机:50070)也包含元数据。所以每行如下(一个长字符串):

“ConsumerRecord(topic=abcdef,partition=0,offset=3,CreateTime=123456789,checksum=987654321,序列化键大小=-1,序列化值大小=28,key=null,value=aaaa,bbbbbb,cccc,dddddd,eeee)”

我想读取数据本身是没有意义的,然后拆分和解析长字符串以获得“value”内容不是最好的主意。在

那么我该如何解决这个问题呢?是否可以只读取“值”字段?还是储蓄本身的问题?在


Tags: kafka文件数据字符串应用程序string序列化value
2条回答

我已经解决了这个问题。

正如在原帖子的评论中提到的,我将数据保存在parquet文件格式中,该格式面向列并且易于使用。

在我看来,你这样做的顺序不对。我强烈建议您在pyspark应用程序中直接使用来自Kafka的数据。 如果您愿意,您也可以将Kafka主题写入HDFS以及(请记住,Kafka会保存数据,因此当您在pyspark中读取它时,不会更改从同一主题写入HDFS的内容)。

当数据已经在Kafka中时,将PySpark与HDFS耦合是没有意义的。

下面是一个在pyspark中直接使用Kafka数据的simple example

相关问题 更多 >

    热门问题