Python中文
首页
教程
问答
标签
搜索
登录
注册
如何使用pyspark读取hdfs-kafka数据?
回答此问题可获得
20
贡献值,回答如果被采纳可获得
50
分。
<p>我正在尝试读取通过Kafka和SparkStreaming存储到HDFS的数据。在</p> <p>我使用的是一个Java应用程序,它使用JavaRDD.saveAsTextFile文件方法。基本上是这样的:</p> <pre><code>kafkaStreams.get(i).foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { @Override public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception { consumerRecordJavaRDD.saveAsTextFile("/tmp/abcd_" + System.currentTimeMillis()); }); </code></pre> <p>一个文本文件行被推过卡夫卡。数据被保存,我可以在默认的hadoop浏览器中看到它本地主机:50070</em>。在</p> <p>然后,在pyspark应用程序中,我尝试使用sparkContext.text文件. 在</p> <p>问题是我读取的数据(使用python或“手工”在<em>本地主机:50070</em>)也包含元数据。所以每行如下(一个长字符串):</p> <p><em>“ConsumerRecord(topic=abcdef,partition=0,offset=3,CreateTime=123456789,checksum=987654321,序列化键大小=-1,序列化值大小=28,key=null,value=aaaa,bbbbbb,cccc,dddddd,eeee)”</em></p> <p>我想读取数据本身是没有意义的,然后拆分和解析长字符串以获得“value”内容不是最好的主意。在</p> <p>那么我该如何解决这个问题呢?是否可以只读取“值”字段?还是储蓄本身的问题?在</p>
0 条评论
分类:
Python问答
请先
登录
后评论
默认排序
时间排序
1 个回答
匿名
1天前
擅长:python、mysql、java
<p>我已经解决了这个问题。</p> <p>正如在原帖子的评论中提到的,我将数据保存在parquet文件格式中,该格式面向列并且易于使用。</p>
请先
登录
后评论
针对此问题:
更多的回答
关注
89
关注
收藏
1
收藏,
216
浏览
网友 提问于 2天前
相关Python问题
我想从用户inpu创建一个类的实例
5 回答
我想从用户导入值,为此
10 回答
我想从用户那里得到一个整数输入,然后让for循环遍历该数字,然后调用一个函数多次
9 回答
我想从用户那里收到一个列表,并在其中执行一些步骤,然后在步骤完成后将其打印回来,但它没有按照我想要的方式工作
10 回答
我想从用户那里获取输入,并将值传递给(average=dict[x]/6),然后在那里获取resu
1 回答
我想从第一个列表中展示第一个词,然后从第二个列表中展示十个词,以此类推- Python
1 回答
我想从第一个空lin开始解析文本文件
2 回答
我想从简历、简历中提取特定部分
6 回答
我想从给定字典(python)的字符串中删除\u00a9、\u201d和类似的字符。
1 回答
我想从给定的网站Lin下载许多文件扩展名相同的Wget或Python文件
8 回答
我想从网上搜集一些关于抵押贷款的数据
8 回答
我想从网站上删除电子邮件地址
2 回答
我想从网站上读取数据该网站包含可下载的文件,然后我想用python脚本把它发送给oracle如何?
2 回答
我想从网站中提取数据,然后将其显示在我的网页上
4 回答
我想从网页上提取统计数据。
6 回答
我想从网页上解析首都城市,并在用户输入国家时在终端上打印它们
4 回答
我想从色彩图中删除前n个颜色,而不丢失原始颜色数
4 回答
我想从课堂上打印字典里的键
7 回答
我想从费用表中获取学生上次支付的费用,其中学生id=id
3 回答
我想从较低的顺序对多重列表进行排序,但我无法在一行中生成结果
5 回答