我正在接触hadoop平台,我正在试验的是Spark流式API。我试图读取一个文件流,以计数每x秒后的字数(历史的累计和)。现在我想把前k个单词打印到一个文件中。以下是我要做的:
# sort the dstream for current batch
sorted_counts = counts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
# get the top K values of each rdd from the transformed dstream
topK = sorted_counts.transform(lambda rdd: rdd.take(k))
我可以使用以下方法将输出打印到控制台/日志文件:
^{pr2}$但问题是当我尝试使用以下方法将其打印到文件中时:
topK.saveAsTextFiles(out_path)
或者即使我试图将topK打印到控制台:
topK.pprint()
我得到以下错误
AttributeError: 'list' object has no attribute '_jrdd'
我想是因为rdd.take公司(k) 返回实际列表而不是rdd。我该怎么解决呢?我还想为每个新计算的单词数生成不同的文件。。。i、 e.每x秒一个新的输出文件(由saveAsTextFiles()保证)。如果有帮助的话,我正在使用python编写这个程序。谢谢!在
似乎没有API可以让您这样做。但您可以解决以下问题:
另一种解决方案是(不进行排序):
^{pr2}$这样您就不需要对所有的RDD进行排序,而只需要获取最大的元素,然后从中创建RDD。在
相关问题 更多 >
编程相关推荐