Spark流打印DStream的前k个结果

2024-09-24 00:24:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在接触hadoop平台,我正在试验的是Spark流式API。我试图读取一个文件流,以计数每x秒后的字数(历史的累计和)。现在我想把前k个单词打印到一个文件中。以下是我要做的:

# sort the dstream for current batch
sorted_counts = counts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))

# get the top K values of each rdd from the transformed dstream
topK = sorted_counts.transform(lambda rdd: rdd.take(k))

我可以使用以下方法将输出打印到控制台/日志文件:

^{pr2}$

但问题是当我尝试使用以下方法将其打印到文件中时:

topK.saveAsTextFiles(out_path)

或者即使我试图将topK打印到控制台:

topK.pprint()

我得到以下错误

AttributeError: 'list' object has no attribute '_jrdd'

我想是因为rdd.take公司(k) 返回实际列表而不是rdd。我该怎么解决呢?我还想为每个新计算的单词数生成不同的文件。。。i、 e.每x秒一个新的输出文件(由saveAsTextFiles()保证)。如果有帮助的话,我正在使用python编写这个程序。谢谢!在


Tags: 文件the方法lambdahadooptransform平台单词
1条回答
网友
1楼 · 发布于 2024-09-24 00:24:07

似乎没有API可以让您这样做。但您可以解决以下问题:

rdd.zipWithIndex().filter(<filter with big index>).map(<remove index here>)

另一种解决方案是(不进行排序):

^{pr2}$

这样您就不需要对所有的RDD进行排序,而只需要获取最大的元素,然后从中创建RDD。在

相关问题 更多 >