如何在12k文件/50gb上加速这些数据帧操作?

2024-05-08 07:16:11 发布

您现在位置:Python中文网/ 问答频道 /正文

背景:

我有12000个csv文件(50gb)的数据,这些文件大多具有相同的格式,但有些可能缺少一列或两列,有些标题行可能并不总是从文件的第一行开始

我有一个类,其中包含两个函数,它们利用pandas来分析和规范这些存储在本地或google bucket中的csv文件

这些功能中会发生以下操作:

analyze_files

  • 循环浏览所有文件,“窥视”它们的内容以确定标题,以及是否需要跳过任何行才能到达标题行
  • 将所有收集的标题转换为标准格式,从文件名中删除除字母数字和下划线以外的所有内容

normalize_files

  • 循环浏览所有文件,这次完全加载每个文件
  • 将列标题从analyze_files转换为标题的标准化版本
  • 上载或保存文件的更新版本

这些功能按预期工作。但是,我正在寻找可以加快速度的方法。
使用以下版本(简化为mvce)和12000个本地文件(8核16gb ram)

  • analyze_files大约需要2-4分钟
  • normalize_files大约需要52分钟
from google.cloud import storage
import pandas as pd
import glob
import os
import re

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./service_account_details.json"


class MyClass(object):
    def __init__(self, uses_gs=False, gs_bucket_name=None, gs_folder_path=None):
        self.__uses_gs = uses_gs
        if uses_gs:
            self.__gs_client = storage.Client()
            self.__gs_bucket_name = gs_bucket_name
            self.__gs_bucket = self.__gs_client.get_bucket(gs_bucket_name)
            self.__gs_folder_path = gs_folder_path
        else:
            # save to a subfolder of current directory
            self.__save_location = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.__name__)
            if not os.path.exists(self.__save_location):
                os.mkdir(self.__save_location)
        self.__file_analysis = dict()
        self.__file_columns = set()
        self.__file_column_mapping = dict()
    
    def analyze_files(self):
        # collect the list of files
        files_to_analyze = list()
        if self.__uses_gs:
            gs_files = self.__gs_client.list_blobs(self.__gs_bucket, prefix=self.__gs_folder_path, delimiter="/")
            for file in gs_files:
                if file.name == self.__gs_folder_path:
                    continue
                gs_filepath = f"gs://{self._gs_bucket_name}/{file.name}"
                files_to_analyze.append(gs_filepath)
        else:
            local_files = glob.glob(os.path.join(self.__save_location, "*.csv"))
            files_to_analyze.extend(local_files)
                
        # analyze each collected file
        for filepath in files_to_analyze:
            # determine how many rows to skip in order to start at the header row,
            # then collect the headers for this particular file, to be utilized for comparisons in `normalize_files`
            skiprows = None
            while True:
                try:
                    df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                    break
                except pd.errors.ParserError as e:
                    try:
                        start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                        skiprows = int(start_row_index) - 1
                    except IndexError:
                        print("Could not locate start_row_index in pandas ParserError message")
                        continue
            headers = df.columns.values.tolist()
            self.__file_columns.update(headers)
            # store file details as pandas parameters, so we can smoothly transition into reading the files efficiently
            skiprows = skiprows + 1 if skiprows else 1  # now that we know the headers, we can skip the header row
            self.__file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
            
        # convert the columns to their bigquery-compliant equivalents
        non_alpha = re.compile(r"([\s\W]|^\d+)")
        multi_under = re.compile(r"(_{2,})")
        self.__file_column_mapping.update({
            file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
            for file_column in self.__file_columns
        })

    def normalize_files(self):
        # perform the normalizations and upload/save the final results
        total_columns = len(self.__file_columns)
        for filepath, params in self.__file_analysis.items():
            df = pd.read_csv(filepath, **params)
            # rename the column header to align with bigquery columns
            df.rename(columns=self.__file_column_mapping, inplace=True)
 
            if len(params["names"]) != total_columns:
                # swap the missing column names out for the bigquery equivalents
                missing_columns = [self.__file_column_mapping[c] for c in self.__file_columns - set(params["names"])]
                # add the missing columns to the dataframe
                df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
 
            if self.__uses_gs:
                blob_path = filepath[5 + len(self.__gs_bucket_name) + 1:]  # "gs://" + "{bucket_name}" + "/"
                self.__gs_bucket.blob(blob_path).upload_from_string(df.to_csv(index=False), "text/csv")
            else:  # save locally
                df.to_csv(filepath, index=False)

我考虑使用dask,与来自multiprocessing模块的ProcessPoolThreadPool结合使用。但是,我正挣扎着到底该采取什么样的方法

由于数据帧操作受CPU限制,它们似乎最适合于dask,可能与ProcessPool相结合,将12k文件划分到8个可用的内核上,然后dask将利用每个内核的线程(克服GIL限制)

将文件上传回磁盘或google bucket似乎更适合于ThreadPool,因为该活动受网络限制

至于从Google bucket读取文件,我不确定哪种方法最有效

基本上可以归结为两种景观:

  1. 使用本地文件时,哪些方法/逻辑的性能最好
  2. 当从Google bucket中提取并保存回(覆盖/更新)Google bucket时,哪些方法/逻辑的性能最好

有人能提供一些方向或代码,为上述两个功能提供最有效的速度提升吗

基准测试将不胜感激,因为我已经花了一周的时间来思考这个话题,如果能有统计数据来支持方法论的决策,那就太好了

我尝试过的当前基准

def local_analysis_test_dir_pd(test_dir):
    file_analysis, file_columns = dict(), set()
    local_files = glob.glob(os.path.join(test_dir, "*.csv"))
    for filepath in local_files:
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    continue
        headers = df.columns.values.tolist()  # noqa
        skiprows = skiprows + 1 if skiprows else 1
        file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        file_columns.update(headers)

    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['local_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
    return file_analysis, file_columns, file_column_mapping

def local_analysis_test_dir_dd(test_dir):
    file_analysis, file_columns = dict(), set()
    local_files = glob.glob(os.path.join(test_dir, "*.csv"))
    
    def dask_worker(filepath):
        siloed_analysis, siloed_columns = dict(), set()
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    return siloed_analysis, siloed_columns
        headers = df.columns.values.tolist()
        siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        siloed_columns.update(headers)
        return siloed_analysis, siloed_columns
    
    dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in local_files]
    file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
    for analysis in file_analyses:
        file_analysis.update(analysis)
    file_columns.update(*column_sets)
    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['local_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))

def remote_analysis_test_dir_pd(test_dir):
    remote_files, file_analysis, file_columns = list(), dict(), set()
    prefix = test_dir.replace("gs://webscraping/", "") + "/"
    gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
    for file in gs_files:
        if file.name == prefix:
            continue
        elif file.name.endswith(".xlsx"):
            continue
        elif not file.name.endswith(".csv"):
            continue
        gs_filepath = f"gs://webscraping/{file.name}"
        remote_files.append(gs_filepath)

    for filepath in remote_files:
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    continue
        headers = df.columns.values.tolist()  # noqa
        skiprows = skiprows + 1 if skiprows else 1
        file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        file_columns.update(headers)

    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['remote_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
    return file_analysis, file_columns, file_column_mapping

def remote_analysis_test_dir_dd(test_dir):
    remote_files, file_analysis, file_columns = list(), dict(), set()
    prefix = test_dir.replace("gs://webscraping/", "") + "/"
    gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
    for file in gs_files:
        if file.name == prefix:
            continue
        elif file.name.endswith(".xlsx"):
            continue
        elif not file.name.endswith(".csv"):
            continue
        gs_filepath = f"gs://webscraping/{file.name}"
        remote_files.append(gs_filepath)

    def dask_worker(filepath):
        siloed_analysis, siloed_columns = dict(), set()
        skiprows = None
        while True:
            try:
                df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
                break
            except pd.errors.ParserError as e:
                try:
                    start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
                    skiprows = int(start_row_index) - 1
                except IndexError:
                    print("Could not locate start_row_index in pandas ParserError message")
                    return siloed_analysis, siloed_columns
        headers = df.columns.values.tolist()
        siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
        siloed_columns.update(headers)
        return siloed_analysis, siloed_columns

    dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in remote_files]
    file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
    for analysis in file_analyses:
        file_analysis.update(analysis)
    file_columns.update(*column_sets)
    non_alpha = re.compile(r"([\s\W]|^\d+)")
    multi_under = re.compile(r"(_{2,})")
    file_column_mapping = {
        file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
        for file_column in file_columns
    }
    # print dictionary length for sanity check; to ensure both functions are performing identical actions.
    print("['remote_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
    return file_analysis, file_columns, file_column_mapping

def normalization_plain_with_pd(file_analysis, file_columns, file_column_mapping, meta_columns):
    total_columns = len(file_columns)
    for filepath, params in file_analysis.items():
        df = pd.read_csv(filepath, **params)
        # rename the column header to align with bigquery columns
        df.rename(columns=file_column_mapping, inplace=True)
        if len(params["names"]) != total_columns:
            missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
            # add the missing columns to the dataframe
            df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
        fpath, fname = os.path.split(filepath)
        if not fpath.startswith("gs://"):
            updated_path = os.path.join(fpath, "normalized_with_pd")
            if not os.path.exists(updated_path):
                os.mkdir(updated_path)
            new_path = os.path.join(updated_path, fname)
        else:
            new_path = "/".join([fpath, "normalized_with_pd", fname])
        df.to_csv(new_path, index=False)

def normalization_plain_with_dd(file_analysis, _file_columns, _file_column_mapping, _meta_columns):
    def dask_worker(file_item, file_columns, file_column_mapping, meta_columns):
        total_columns = len(file_columns)
        filepath, params = file_item
        df = pd.read_csv(filepath, **params)
        # rename the column header to align with bigquery columns
        df.rename(columns=file_column_mapping, inplace=True)
        if len(params["names"]) != total_columns:
            missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
            # add the missing columns to the dataframe
            df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
        fpath, fname = os.path.split(filepath)
        if not fpath.startswith("gs://"):
            updated_path = os.path.join(fpath, "normalized_with_dd")
            if not os.path.exists(updated_path):
                os.mkdir(updated_path)
            new_path = os.path.join(updated_path, fname)
        else:
            new_path = "/".join([fpath, "normalized_with_dd", fname])
        df.to_csv(new_path, index=False)
    dask_futures = [
        dask.delayed(dask_worker)(file_item, _file_columns, _file_column_mapping, _meta_columns)
        for file_item in file_analysis.items()
    ]
    dask.compute(*dask_futures)

if __name__ == "__main__":
    for size, params in local_dirs.items():
        print(f"['{size}_local_analysis_dir_tests'] ({params['items']} files, {params['size']})")
        local_analysis_test_dir_pd(params["directory"])
        local_analysis_test_dir_dd(params["directory"])

    for size, settings in local_dirs.items():
        print(f"['{size}_pre_test_file_cleanup']")
        for file in glob.glob(os.path.join(settings["directory"], '*', '*.csv')):
            os.remove(file)
        print(f"['{size}_local_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
        files, columns, column_mapping = local_analysis_test_dir_pd(settings["directory"])

        local_normalization_plain_with_pd(files, columns, column_mapping, {})
        local_normalization_plain_with_dd(files, columns, column_mapping, {})

    for size, settings in remote_dirs.items():
        print(f"['{size}_remote_analysis_dir_tests'] ({settings['items']} files, {settings['size']})")
        _, _, _ = remote_analysis_test_dir_pd(settings["directory"])
        files, columns, column_mapping = remote_analysis_test_dir_dd(settings["directory"])

        print(f"['{size}_remote_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
        normalization_plain_with_pd(files, columns, column_mapping, {})
        normalization_plain_with_dd(files, columns, column_mapping, {})

到目前为止的结论:

  • local_analysispandas.from_csv相比速度最快,基于:

    • 一个343MB的文件(使用pandas时为0.0210秒,使用dask时为0.5141秒)
    • 8个文件/1.12GB的小目录(使用pandas时为0.1263秒,使用dask时为0.1357秒)
    • 474个文件/2.03 GB的中等目录(使用pandas时为3.2991秒,使用dask时为3.7717秒)
    • 13361个文件/46.30GB的xlarge目录(使用pandas时为131.5941秒,使用dask时为132.6982秒)
  • local_normalizationpandas.from_csv相比速度最快,基于:

    • 8个文件/1.12GB的小目录(使用pandas时为61.2338秒,使用dask时为62.2033秒)
    • 474个文件/2.03GB的中等目录(使用pandas时为136.8900秒,使用dask时为132.7574秒)
    • 13361个文件/46.30 GB的xlarge目录(使用pandas时为3166.0797秒,使用dask时为3265.4251秒)
  • remote_analysisdask.delayed相比速度最快,基于:

    • 8个文件/1.12GB的小目录(使用pandas时为8.6728秒,使用dask时为6.0795秒)
    • 474个文件/2.03 GB的中等目录(使用pandas时为149.7931秒,使用dask时为37.3509秒)
  • remote_normalizationdask.delayed相比速度最快,基于:

    • 8个文件/1.12GB的小目录(使用pandas时为1758.1562秒,使用dask时为1431.9895秒)
    • 尚未进行基准测试的中型和大型数据集
  • 注意:dask测试利用pandas.from_csv内部dask.delayed()调用来获得最大的时间减少


Tags: columnspathinselfgsdfforcolumn
2条回答

就像其他代码所说的那样,upload_from_string位需要一段时间。您是否考虑过将它们写入Google BigQuery,而不是将它们保存为bucket中的.csv文件?我发现这对我来说更快

这里可能适合使用delayedAPI。您提供的类相当复杂,但这是适用于这种情况的大致模式:

import dask

@dask.delayed
def analyze_one_file(file_name):
    # use the code you run on a single file here
    return dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))

# form delayed computations
delayed_values = [analyze_one_file(filepath) for filepath in files_to_analyze]

# execute the delayed computations
results = dask.compute(delayed_values)

# now results will be a list of dictionaries (or whatever
# the delayed function returns)

# apply similar wrapping to normalize_files loop

对于您的案例,可能有一个更有效的ETL过程,但这是特定于具体情况的,因此假设有必要对文件进行迭代以发现要跳过的行数,那么使用delayed进行包装可能足以将df处理时间缩短一倍

相关问题 更多 >