输入: abc.tar.gz->;联合国tar->;文件夹:abc
abc的文件夹结构:
根文件夹:abc包含每天每5分钟从100个城市生成的csv文件
csv文件数量:100个城市*12个文件/小时*24小时=28800个csv文件
abc/
city1_0005.csv
city1_0010.csv
..
city1_2355.csv
..
..
city2_0005.csv
city2_0010.csv
..
city2_2355.csv
..
..
city100_0005.csv
city100_0010.csv
功能要求:
技术要求: 并行读取和处理文件以获得更好的性能
我开发了以下代码来按顺序处理数据。 我正在寻找优化它的方法
staging_path="abfss://xyz/abc"
#using databricks utils to get the list of files in folder
filesProp = dbutils.fs.ls(staging_adls_path)
#extracting the city names from list of filenames
filesSet =set()
for file in filesProp:
filesSet.add(file.name.split('-')[0])
#empty list to store dataframes
dictionary_df = {}
#reading 1 city data and inserting to table
for fileName in filesSet:
filePath = staging_path+fileName+"*"
print(filePath)
dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath)
dictionary_df[fileName].write.saveAsTable(fileName)
这就是我解决这个问题的方法
使用shell脚本将基于城市的CSV移动到/特定文件夹
由于您已经在Azure和Databricks上,我建议您使用CloudFiles数据格式,与开源结构化流媒体+csv选项相比,该格式在并行扫描数据湖中的原始文件时将提供更好的性能
使用带foreachBatch()和触发器(once=True)的结构化流将仅处理自上次执行以来的增量文件,并在检查点位置下维护已处理文件的详细信息
process\u multiple\u csv\u different\u schema函数接受一个微批次,它将从相应的csv文件中选取列并写入相应的城市表
相关问题 更多 >
编程相关推荐