我刚接触Spark,不太清楚如何问这个问题(使用哪些术语,等等),下面是我在概念上试图实现的目标:
我有很多小的,单独的.txt“ledger”文件(例如,有时间戳和属性值的行分隔文件)。在
我想:
将每个“账本”文件读入单独的数据帧(读取:不合并成一个大数据帧);
对每个单独的数据帧执行一些基本计算,从而生成一行新的数据值;然后
将所有单独的结果行合并到一个最终对象中,并将其保存在以行分隔的文件中到磁盘。
似乎我找到的几乎每一个答案(当谷歌搜索相关术语时)都是关于将多个文件加载到单个RDD或数据帧中,但我确实找到了以下Scala代码:
val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val logData = sc.textFile(file);
val numAs = logData.filter(line => line.contains("a")).count();
println("Lines with a: %s".format(numAs));
// save rdd of single file processed data to hdfs comes here
}
files.collect.foreach( filename => {
doSomething(filename)
})
。。。但是:
A.我不知道这是否与读取/分析操作并行,并且
我不认为它能把结果合并成一个单一的对象。在
任何方向或建议都非常感谢!在
更新
似乎我要做的(在多个文件上并行运行一个脚本,然后合并结果)可能需要类似thread pools(?)的东西。在
为了清楚起见,下面是一个我想对通过读取“ledger”文件创建的数据帧执行的计算示例:
^{pr2}$因此,像这样的分类账:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
将减少至(假设计算于2019-04-14运行):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }
不建议使用
wholeTextFiles
,因为它会立即将整个文件加载到内存中。如果您真的想为每个文件创建一个单独的数据帧,您可以简单地使用完整路径而不是目录。但是,不建议这样做,而且很可能导致资源利用率低下。相反,考虑使用input_file_path
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name例如:
^{pr2}$所以这些文件可以单独处理,然后再合并。在
您可以在hdfs中获取文件路径
为每个数据帧创建唯一的数据路径
^{pr2}$在联合到一个数据帧之前应用过滤器或任何转换
相关问题 更多 >
编程相关推荐