我有12000个csv文件(50gb)的数据,这些文件大多具有相同的格式,但有些可能缺少一列或两列,有些标题行可能并不总是从文件的第一行开始
我有一个类,其中包含两个函数,它们利用pandas
来分析和规范这些存储在本地或google bucket中的csv文件
这些功能中会发生以下操作:
在analyze_files
在normalize_files
analyze_files
转换为标题的标准化版本李>这些功能按预期工作。但是,我正在寻找可以加快速度的方法。
使用以下版本(简化为mvce)和12000个本地文件(8核16gb ram)
analyze_files
大约需要2-4分钟normalize_files
大约需要52分钟from google.cloud import storage
import pandas as pd
import glob
import os
import re
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./service_account_details.json"
class MyClass(object):
def __init__(self, uses_gs=False, gs_bucket_name=None, gs_folder_path=None):
self.__uses_gs = uses_gs
if uses_gs:
self.__gs_client = storage.Client()
self.__gs_bucket_name = gs_bucket_name
self.__gs_bucket = self.__gs_client.get_bucket(gs_bucket_name)
self.__gs_folder_path = gs_folder_path
else:
# save to a subfolder of current directory
self.__save_location = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.__name__)
if not os.path.exists(self.__save_location):
os.mkdir(self.__save_location)
self.__file_analysis = dict()
self.__file_columns = set()
self.__file_column_mapping = dict()
def analyze_files(self):
# collect the list of files
files_to_analyze = list()
if self.__uses_gs:
gs_files = self.__gs_client.list_blobs(self.__gs_bucket, prefix=self.__gs_folder_path, delimiter="/")
for file in gs_files:
if file.name == self.__gs_folder_path:
continue
gs_filepath = f"gs://{self._gs_bucket_name}/{file.name}"
files_to_analyze.append(gs_filepath)
else:
local_files = glob.glob(os.path.join(self.__save_location, "*.csv"))
files_to_analyze.extend(local_files)
# analyze each collected file
for filepath in files_to_analyze:
# determine how many rows to skip in order to start at the header row,
# then collect the headers for this particular file, to be utilized for comparisons in `normalize_files`
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist()
self.__file_columns.update(headers)
# store file details as pandas parameters, so we can smoothly transition into reading the files efficiently
skiprows = skiprows + 1 if skiprows else 1 # now that we know the headers, we can skip the header row
self.__file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
# convert the columns to their bigquery-compliant equivalents
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
self.__file_column_mapping.update({
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in self.__file_columns
})
def normalize_files(self):
# perform the normalizations and upload/save the final results
total_columns = len(self.__file_columns)
for filepath, params in self.__file_analysis.items():
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=self.__file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
# swap the missing column names out for the bigquery equivalents
missing_columns = [self.__file_column_mapping[c] for c in self.__file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
if self.__uses_gs:
blob_path = filepath[5 + len(self.__gs_bucket_name) + 1:] # "gs://" + "{bucket_name}" + "/"
self.__gs_bucket.blob(blob_path).upload_from_string(df.to_csv(index=False), "text/csv")
else: # save locally
df.to_csv(filepath, index=False)
我考虑使用dask
,与来自multiprocessing
模块的ProcessPool
和ThreadPool
结合使用。但是,我正挣扎着到底该采取什么样的方法
由于数据帧操作受CPU限制,它们似乎最适合于dask
,可能与ProcessPool
相结合,将12k文件划分到8个可用的内核上,然后dask
将利用每个内核的线程(克服GIL限制)
将文件上传回磁盘或google bucket似乎更适合于ThreadPool
,因为该活动受网络限制
至于从Google bucket读取文件,我不确定哪种方法最有效
基本上可以归结为两种景观:
有人能提供一些方向或代码,为上述两个功能提供最有效的速度提升吗
基准测试将不胜感激,因为我已经花了一周的时间来思考这个话题,如果能有统计数据来支持方法论的决策,那就太好了
def local_analysis_test_dir_pd(test_dir):
file_analysis, file_columns = dict(), set()
local_files = glob.glob(os.path.join(test_dir, "*.csv"))
for filepath in local_files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
file_columns.update(headers)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['local_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def local_analysis_test_dir_dd(test_dir):
file_analysis, file_columns = dict(), set()
local_files = glob.glob(os.path.join(test_dir, "*.csv"))
def dask_worker(filepath):
siloed_analysis, siloed_columns = dict(), set()
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
return siloed_analysis, siloed_columns
headers = df.columns.values.tolist()
siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
siloed_columns.update(headers)
return siloed_analysis, siloed_columns
dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in local_files]
file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
for analysis in file_analyses:
file_analysis.update(analysis)
file_columns.update(*column_sets)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['local_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
def remote_analysis_test_dir_pd(test_dir):
remote_files, file_analysis, file_columns = list(), dict(), set()
prefix = test_dir.replace("gs://webscraping/", "") + "/"
gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
for file in gs_files:
if file.name == prefix:
continue
elif file.name.endswith(".xlsx"):
continue
elif not file.name.endswith(".csv"):
continue
gs_filepath = f"gs://webscraping/{file.name}"
remote_files.append(gs_filepath)
for filepath in remote_files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
file_columns.update(headers)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['remote_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def remote_analysis_test_dir_dd(test_dir):
remote_files, file_analysis, file_columns = list(), dict(), set()
prefix = test_dir.replace("gs://webscraping/", "") + "/"
gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
for file in gs_files:
if file.name == prefix:
continue
elif file.name.endswith(".xlsx"):
continue
elif not file.name.endswith(".csv"):
continue
gs_filepath = f"gs://webscraping/{file.name}"
remote_files.append(gs_filepath)
def dask_worker(filepath):
siloed_analysis, siloed_columns = dict(), set()
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
return siloed_analysis, siloed_columns
headers = df.columns.values.tolist()
siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
siloed_columns.update(headers)
return siloed_analysis, siloed_columns
dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in remote_files]
file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
for analysis in file_analyses:
file_analysis.update(analysis)
file_columns.update(*column_sets)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['remote_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def normalization_plain_with_pd(file_analysis, file_columns, file_column_mapping, meta_columns):
total_columns = len(file_columns)
for filepath, params in file_analysis.items():
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
fpath, fname = os.path.split(filepath)
if not fpath.startswith("gs://"):
updated_path = os.path.join(fpath, "normalized_with_pd")
if not os.path.exists(updated_path):
os.mkdir(updated_path)
new_path = os.path.join(updated_path, fname)
else:
new_path = "/".join([fpath, "normalized_with_pd", fname])
df.to_csv(new_path, index=False)
def normalization_plain_with_dd(file_analysis, _file_columns, _file_column_mapping, _meta_columns):
def dask_worker(file_item, file_columns, file_column_mapping, meta_columns):
total_columns = len(file_columns)
filepath, params = file_item
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
fpath, fname = os.path.split(filepath)
if not fpath.startswith("gs://"):
updated_path = os.path.join(fpath, "normalized_with_dd")
if not os.path.exists(updated_path):
os.mkdir(updated_path)
new_path = os.path.join(updated_path, fname)
else:
new_path = "/".join([fpath, "normalized_with_dd", fname])
df.to_csv(new_path, index=False)
dask_futures = [
dask.delayed(dask_worker)(file_item, _file_columns, _file_column_mapping, _meta_columns)
for file_item in file_analysis.items()
]
dask.compute(*dask_futures)
if __name__ == "__main__":
for size, params in local_dirs.items():
print(f"['{size}_local_analysis_dir_tests'] ({params['items']} files, {params['size']})")
local_analysis_test_dir_pd(params["directory"])
local_analysis_test_dir_dd(params["directory"])
for size, settings in local_dirs.items():
print(f"['{size}_pre_test_file_cleanup']")
for file in glob.glob(os.path.join(settings["directory"], '*', '*.csv')):
os.remove(file)
print(f"['{size}_local_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
files, columns, column_mapping = local_analysis_test_dir_pd(settings["directory"])
local_normalization_plain_with_pd(files, columns, column_mapping, {})
local_normalization_plain_with_dd(files, columns, column_mapping, {})
for size, settings in remote_dirs.items():
print(f"['{size}_remote_analysis_dir_tests'] ({settings['items']} files, {settings['size']})")
_, _, _ = remote_analysis_test_dir_pd(settings["directory"])
files, columns, column_mapping = remote_analysis_test_dir_dd(settings["directory"])
print(f"['{size}_remote_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
normalization_plain_with_pd(files, columns, column_mapping, {})
normalization_plain_with_dd(files, columns, column_mapping, {})
到目前为止的结论:
local_analysis
与pandas.from_csv
相比速度最快,基于:
pandas
时为0.0210秒,使用dask
时为0.5141秒)pandas
时为0.1263秒,使用dask
时为0.1357秒)pandas
时为3.2991秒,使用dask
时为3.7717秒)pandas
时为131.5941秒,使用dask
时为132.6982秒)local_normalization
与pandas.from_csv
相比速度最快,基于:
pandas
时为61.2338秒,使用dask
时为62.2033秒)pandas
时为136.8900秒,使用dask
时为132.7574秒)pandas
时为3166.0797秒,使用dask
时为3265.4251秒)remote_analysis
与dask.delayed
相比速度最快,基于:
pandas
时为8.6728秒,使用dask
时为6.0795秒)pandas
时为149.7931秒,使用dask
时为37.3509秒)李>remote_normalization
与dask.delayed
相比速度最快,基于:
pandas
时为1758.1562秒,使用dask
时为1431.9895秒)注意:dask
测试利用pandas.from_csv
内部dask.delayed()
调用来获得最大的时间减少
就像其他代码所说的那样,
upload_from_string
位需要一段时间。您是否考虑过将它们写入Google BigQuery,而不是将它们保存为bucket中的.csv文件?我发现这对我来说更快这里可能适合使用
delayed
API。您提供的类相当复杂,但这是适用于这种情况的大致模式:对于您的案例,可能有一个更有效的ETL过程,但这是特定于具体情况的,因此假设有必要对文件进行迭代以发现要跳过的行数,那么使用
delayed
进行包装可能足以将df处理时间缩短一倍相关问题 更多 >
编程相关推荐