从文件夹中读取多组csv文件,并使用spark或DataRicks并行插入到相应的目标表中

2024-07-05 14:11:37 发布

您现在位置:Python中文网/ 问答频道 /正文

输入: abc.tar.gz->;联合国tar->;文件夹:abc

abc的文件夹结构:

根文件夹:abc包含每天每5分钟从100个城市生成的csv文件

csv文件数量:100个城市*12个文件/小时*24小时=28800个csv文件

abc/    
city1_0005.csv
city1_0010.csv
..
city1_2355.csv
..
..
city2_0005.csv
city2_0010.csv
..
city2_2355.csv
..
..
city100_0005.csv
city100_0010.csv

功能要求:

  • 使用spark/databricks为每个城市创建表格,并将相应的城市csv文件(288)加载到表格中。目标位置总共有100张桌子。1个城市1张表
  • 每个城市都有不同的模式。每个城市的所有列都不同。因此,我无法将所有城市数据写入带有分区的单个表中

技术要求: 并行读取和处理文件以获得更好的性能


我开发了以下代码来按顺序处理数据。 我正在寻找优化它的方法

staging_path="abfss://xyz/abc"

#using databricks utils  to get the list of files in folder
filesProp = dbutils.fs.ls(staging_adls_path)

#extracting the city names from list of filenames
filesSet  =set()
for file in filesProp:
    filesSet.add(file.name.split('-')[0])

#empty list to store dataframes
dictionary_df = {} 

#reading 1 city data and inserting to table
for fileName in filesSet:
    filePath = staging_path+fileName+"*"
    print(filePath)
    dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath)
    dictionary_df[fileName].write.saveAsTable(fileName) 

Tags: 文件csvtopathin文件夹dfdictionary
1条回答
网友
1楼 · 发布于 2024-07-05 14:11:37

这就是我解决这个问题的方法

  • 使用shell脚本将基于城市的CSV移动到/特定文件夹

     This will ensure the files with same schema are under same root folder
     /abc/
         city1/
              20211021/city1_0005 
              20211021/city1_0010
               ...
         city2/
              20211021/city2_0005
              20211021/city2_0010 
    
  • 由于您已经在Azure和Databricks上,我建议您使用CloudFiles数据格式,与开源结构化流媒体+csv选项相比,该格式在并行扫描数据湖中的原始文件时将提供更好的性能

  • 使用带foreachBatch()触发器(once=True)的结构化流将仅处理自上次执行以来的增量文件,并在检查点位置下维护已处理文件的详细信息

  • process\u multiple\u csv\u different\u schema函数接受一个微批次,它将从相应的csv文件中选取列并写入相应的城市表

from pyspark.sql import functions as F

tmp_db = "test_multiple_csv_schema"
spark.sql(f"create database if not exists {tmp_db}")
base_path = <your_base_mount_path_root_folder_for_csvs>
checkpoint_location = f"{base_path}/checkpoint/multiplecsvs"
input_path = f"{base_path}/multiplecsvs/"
schema_location = f"{base_path}/schema/multiplecsvs"
staging_checkpoint_path = f"{base_path}/staging/checkpoint/multiplecsvs"
staging_data_path = f"{base_path}/staging/data/multiplecsvs"
input_format = "csv"

def process_multiple_csvs_different_schema(batch_df):

      df = (
             batch_df
                .withColumn("table",F.split(F.col("input_file_name"),"\.csv")[0])
                .withColumn("table_path",F.split(F.col("table"),"/"))
                .withColumn("table_name",F.split(F.col("table"),"/")[F.size(F.col("table_path"))-1])
                .drop("table","table_path")
          )

     list_of_cities = df.select("table_name").distinct().collect()

     list_of_cities = [city[0] for city in list_of_cities]

     for city in list_of_cities:
        print(f"processing data for {city}")
        city_df = df.where(f"table_name='{city}'")
        input_file_name = city_df.limit(1).select("input_file_name").collect()[0][0]

        df_schema = spark.read.option("format",input_format).option("header",True).load(input_file_name,format=input_format)
        select_columns = df_schema.columns

        city_df.select(select_columns).withColumn("processed_time",F.current_timestamp()).write.option("mergeSchema",True).option("mode","append").format("delta").saveAsTable(f"{tmp_db}.{city}")

raw_df = (spark
          .readStream
          .format("cloudFiles")
          .option("cloudFiles.format",input_format)
          .option("cloudFiles.schemaLocation",schema_location)
          .load(input_path)
         )

(
  raw_df.withColumn("input_file_name",F.input_file_name())
        .writeStream
        .option("checkpointLocation",staging_checkpoint_path)
        .option("mergeSchema",True)
        .option("format","delta")
        .outputMode("append")
        .trigger(once=True)
        .start(staging_data_path)
        .awaitTermination()
)

staging_df = spark.readStream.format("delta").load(staging_data_path)
(
  staging_df.writeStream
     .option("checkpointLocation",checkpoint_location)
     .option("format","delta")
     .trigger(once=True)
     .foreachBatch(lambda batch_df,batch_id:process_multiple_csvs_different_schema(batch_df))
     .start()
     .awaitTermination()
)

相关问题 更多 >