回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<h3>背景:</h3>
<p>我有12000个csv文件(50gb)的数据,这些文件大多具有相同的格式,但有些可能缺少一列或两列,有些标题行可能并不总是从文件的第一行开始</p>
<p>我有一个类,其中包含两个函数,它们利用<code>pandas</code>来分析和规范这些存储在本地或google bucket中的csv文件</p>
<p>这些功能中会发生以下操作:</p>
<p><strong>在<code>analyze_files</code></strong></p>
<ul>
<li>循环浏览所有文件,“窥视”它们的内容以确定标题,以及是否需要跳过任何行才能到达标题行</李>
<li>将所有收集的标题转换为标准格式,从文件名中删除除字母数字和下划线以外的所有内容</李>
</ul>
<p><strong>在<code>normalize_files</code></strong></p>
<ul>
<li>循环浏览所有文件,这次完全加载每个文件</李>
<li>将列标题从<code>analyze_files</code>转换为标题的标准化版本</李>
<li>上载或保存文件的更新版本</li>
</ul>
<p>这些功能按预期工作。但是,我正在寻找可以加快速度的方法。<br/>
使用以下版本(简化为mvce)和12000个本地文件(8核16gb ram)</p>
<ul>
<li><code>analyze_files</code>大约需要2-4分钟</li>
<li><code>normalize_files</code>大约需要52分钟</li>
</ul>
<pre class="lang-py prettyprint-override"><code>from google.cloud import storage
import pandas as pd
import glob
import os
import re
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./service_account_details.json"
class MyClass(object):
def __init__(self, uses_gs=False, gs_bucket_name=None, gs_folder_path=None):
self.__uses_gs = uses_gs
if uses_gs:
self.__gs_client = storage.Client()
self.__gs_bucket_name = gs_bucket_name
self.__gs_bucket = self.__gs_client.get_bucket(gs_bucket_name)
self.__gs_folder_path = gs_folder_path
else:
# save to a subfolder of current directory
self.__save_location = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.__name__)
if not os.path.exists(self.__save_location):
os.mkdir(self.__save_location)
self.__file_analysis = dict()
self.__file_columns = set()
self.__file_column_mapping = dict()
def analyze_files(self):
# collect the list of files
files_to_analyze = list()
if self.__uses_gs:
gs_files = self.__gs_client.list_blobs(self.__gs_bucket, prefix=self.__gs_folder_path, delimiter="/")
for file in gs_files:
if file.name == self.__gs_folder_path:
continue
gs_filepath = f"gs://{self._gs_bucket_name}/{file.name}"
files_to_analyze.append(gs_filepath)
else:
local_files = glob.glob(os.path.join(self.__save_location, "*.csv"))
files_to_analyze.extend(local_files)
# analyze each collected file
for filepath in files_to_analyze:
# determine how many rows to skip in order to start at the header row,
# then collect the headers for this particular file, to be utilized for comparisons in `normalize_files`
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist()
self.__file_columns.update(headers)
# store file details as pandas parameters, so we can smoothly transition into reading the files efficiently
skiprows = skiprows + 1 if skiprows else 1 # now that we know the headers, we can skip the header row
self.__file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
# convert the columns to their bigquery-compliant equivalents
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
self.__file_column_mapping.update({
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in self.__file_columns
})
def normalize_files(self):
# perform the normalizations and upload/save the final results
total_columns = len(self.__file_columns)
for filepath, params in self.__file_analysis.items():
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=self.__file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
# swap the missing column names out for the bigquery equivalents
missing_columns = [self.__file_column_mapping[c] for c in self.__file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
if self.__uses_gs:
blob_path = filepath[5 + len(self.__gs_bucket_name) + 1:] # "gs://" + "{bucket_name}" + "/"
self.__gs_bucket.blob(blob_path).upload_from_string(df.to_csv(index=False), "text/csv")
else: # save locally
df.to_csv(filepath, index=False)
</code></pre>
<p>我考虑使用<code>dask</code>,与来自<code>multiprocessing</code>模块的<code>ProcessPool</code>和<code>ThreadPool</code>结合使用。但是,我正挣扎着到底该采取什么样的方法</p>
<p>由于数据帧操作受CPU限制,它们似乎最适合于<code>dask</code>,可能与<code>ProcessPool</code>相结合,将12k文件划分到8个可用的内核上,然后<code>dask</code>将利用每个内核的线程(克服GIL限制)</p>
<p>将文件上传回磁盘或google bucket似乎更适合于<code>ThreadPool</code>,因为该活动受网络限制</p>
<p>至于从Google bucket读取文件,我不确定哪种方法最有效</p>
<p>基本上可以归结为两种景观:</p>
<ol>
<li>使用本地文件时,哪些方法/逻辑的性能最好</李>
<li>当从Google bucket中提取并保存回(覆盖/更新)Google bucket时,哪些方法/逻辑的性能最好</李>
</ol>
<p>有人能提供一些方向或代码,为上述两个功能提供最有效的速度提升吗</p>
<p>基准测试将不胜感激,因为我已经花了一周的时间来思考这个话题,如果能有统计数据来支持方法论的决策,那就太好了</p>
<h2>我尝试过的当前基准</h2>
<pre class="lang-py prettyprint-override"><code>def local_analysis_test_dir_pd(test_dir):
file_analysis, file_columns = dict(), set()
local_files = glob.glob(os.path.join(test_dir, "*.csv"))
for filepath in local_files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
file_columns.update(headers)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['local_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def local_analysis_test_dir_dd(test_dir):
file_analysis, file_columns = dict(), set()
local_files = glob.glob(os.path.join(test_dir, "*.csv"))
def dask_worker(filepath):
siloed_analysis, siloed_columns = dict(), set()
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
return siloed_analysis, siloed_columns
headers = df.columns.values.tolist()
siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
siloed_columns.update(headers)
return siloed_analysis, siloed_columns
dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in local_files]
file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
for analysis in file_analyses:
file_analysis.update(analysis)
file_columns.update(*column_sets)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub(" ", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['local_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
def remote_analysis_test_dir_pd(test_dir):
remote_files, file_analysis, file_columns = list(), dict(), set()
prefix = test_dir.replace("gs://webscraping/", "") + "/"
gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
for file in gs_files:
if file.name == prefix:
continue
elif file.name.endswith(".xlsx"):
continue
elif not file.name.endswith(".csv"):
continue
gs_filepath = f"gs://webscraping/{file.name}"
remote_files.append(gs_filepath)
for filepath in remote_files:
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
continue
headers = df.columns.values.tolist() # noqa
skiprows = skiprows + 1 if skiprows else 1
file_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
file_columns.update(headers)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['remote_analysis_test_dir_pd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def remote_analysis_test_dir_dd(test_dir):
remote_files, file_analysis, file_columns = list(), dict(), set()
prefix = test_dir.replace("gs://webscraping/", "") + "/"
gs_files = gs_client.list_blobs("webscraping", prefix=prefix, delimiter="/")
for file in gs_files:
if file.name == prefix:
continue
elif file.name.endswith(".xlsx"):
continue
elif not file.name.endswith(".csv"):
continue
gs_filepath = f"gs://webscraping/{file.name}"
remote_files.append(gs_filepath)
def dask_worker(filepath):
siloed_analysis, siloed_columns = dict(), set()
skiprows = None
while True:
try:
df = pd.read_csv(filepath, nrows=nrows, skiprows=skiprows)
break
except pd.errors.ParserError as e:
try:
start_row_index = re.findall(r"Expected \d+ fields in line (\d+), saw \d+", str(e))[0]
skiprows = int(start_row_index) - 1
except IndexError:
print("Could not locate start_row_index in pandas ParserError message")
return siloed_analysis, siloed_columns
headers = df.columns.values.tolist()
siloed_analysis[filepath] = dict(skiprows=skiprows, names=headers, dtype=dict.fromkeys(headers, str))
siloed_columns.update(headers)
return siloed_analysis, siloed_columns
dask_futures = [dask.delayed(dask_worker)(filepath) for filepath in remote_files]
file_analyses, column_sets = map(list, zip(*list(dask.compute(*dask_futures))))
for analysis in file_analyses:
file_analysis.update(analysis)
file_columns.update(*column_sets)
non_alpha = re.compile(r"([\s\W]|^\d+)")
multi_under = re.compile(r"(_{2,})")
file_column_mapping = {
file_column: multi_under.sub("_", non_alpha.sub("_", file_column)).upper()
for file_column in file_columns
}
# print dictionary length for sanity check; to ensure both functions are performing identical actions.
print("['remote_analysis_test_dir_dd'] result:", len(file_analysis), len(file_columns))
return file_analysis, file_columns, file_column_mapping
def normalization_plain_with_pd(file_analysis, file_columns, file_column_mapping, meta_columns):
total_columns = len(file_columns)
for filepath, params in file_analysis.items():
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
fpath, fname = os.path.split(filepath)
if not fpath.startswith("gs://"):
updated_path = os.path.join(fpath, "normalized_with_pd")
if not os.path.exists(updated_path):
os.mkdir(updated_path)
new_path = os.path.join(updated_path, fname)
else:
new_path = "/".join([fpath, "normalized_with_pd", fname])
df.to_csv(new_path, index=False)
def normalization_plain_with_dd(file_analysis, _file_columns, _file_column_mapping, _meta_columns):
def dask_worker(file_item, file_columns, file_column_mapping, meta_columns):
total_columns = len(file_columns)
filepath, params = file_item
df = pd.read_csv(filepath, **params)
# rename the column header to align with bigquery columns
df.rename(columns=file_column_mapping, inplace=True)
if len(params["names"]) != total_columns:
missing_columns = [file_column_mapping[c] for c in file_columns - set(params["names"])]
# add the missing columns to the dataframe
df[[*missing_columns]] = pd.DataFrame([[np.nan] * len(missing_columns)], index=df.index)
fpath, fname = os.path.split(filepath)
if not fpath.startswith("gs://"):
updated_path = os.path.join(fpath, "normalized_with_dd")
if not os.path.exists(updated_path):
os.mkdir(updated_path)
new_path = os.path.join(updated_path, fname)
else:
new_path = "/".join([fpath, "normalized_with_dd", fname])
df.to_csv(new_path, index=False)
dask_futures = [
dask.delayed(dask_worker)(file_item, _file_columns, _file_column_mapping, _meta_columns)
for file_item in file_analysis.items()
]
dask.compute(*dask_futures)
if __name__ == "__main__":
for size, params in local_dirs.items():
print(f"['{size}_local_analysis_dir_tests'] ({params['items']} files, {params['size']})")
local_analysis_test_dir_pd(params["directory"])
local_analysis_test_dir_dd(params["directory"])
for size, settings in local_dirs.items():
print(f"['{size}_pre_test_file_cleanup']")
for file in glob.glob(os.path.join(settings["directory"], '*', '*.csv')):
os.remove(file)
print(f"['{size}_local_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
files, columns, column_mapping = local_analysis_test_dir_pd(settings["directory"])
local_normalization_plain_with_pd(files, columns, column_mapping, {})
local_normalization_plain_with_dd(files, columns, column_mapping, {})
for size, settings in remote_dirs.items():
print(f"['{size}_remote_analysis_dir_tests'] ({settings['items']} files, {settings['size']})")
_, _, _ = remote_analysis_test_dir_pd(settings["directory"])
files, columns, column_mapping = remote_analysis_test_dir_dd(settings["directory"])
print(f"['{size}_remote_normalization_dir_tests'] ({settings['items']} files, {settings['size']})")
normalization_plain_with_pd(files, columns, column_mapping, {})
normalization_plain_with_dd(files, columns, column_mapping, {})
</code></pre>
<p><strong>到目前为止的结论:</strong></p>
<ul>
<li><p><code>local_analysis</code>与<code>pandas.from_csv</code>相比速度最快,基于:</p>
<ul>
<li>一个343MB的文件(使用<code>pandas</code>时为0.0210秒,使用<code>dask</code>时为0.5141秒)</li>
<li>8个文件/1.12GB的小目录(使用<code>pandas</code>时为0.1263秒,使用<code>dask</code>时为0.1357秒)</li>
<li>474个文件/2.03 GB的中等目录(使用<code>pandas</code>时为3.2991秒,使用<code>dask</code>时为3.7717秒)</li>
<li>13361个文件/46.30GB的xlarge目录(使用<code>pandas</code>时为131.5941秒,使用<code>dask</code>时为132.6982秒)</li>
</ul>
</li>
<li><p><code>local_normalization</code>与<code>pandas.from_csv</code>相比速度最快,基于:</p>
<ul>
<li>8个文件/1.12GB的小目录(使用<code>pandas</code>时为61.2338秒,使用<code>dask</code>时为62.2033秒)</li>
<li>474个文件/2.03GB的中等目录(使用<code>pandas</code>时为136.8900秒,使用<code>dask</code>时为132.7574秒)</li>
<li>13361个文件/46.30 GB的xlarge目录(使用<code>pandas</code>时为3166.0797秒,使用<code>dask</code>时为3265.4251秒)</li>
</ul>
</li>
<li><p><code>remote_analysis</code>与<code>dask.delayed</code>相比速度最快,基于:</p>
<ul>
<li>8个文件/1.12GB的小目录(使用<code>pandas</code>时为8.6728秒,使用<code>dask</code>时为6.0795秒)</li>
<li>474个文件/2.03 GB的中等目录(使用<code>pandas</code>时为149.7931秒,使用<code>dask</code>时为37.3509秒)</李>
</ul>
</li>
<li><p><code>remote_normalization</code>与<code>dask.delayed</code>相比速度最快,基于:</p>
<ul>
<li>8个文件/1.12GB的小目录(使用<code>pandas</code>时为1758.1562秒,使用<code>dask</code>时为1431.9895秒)</li>
<li>尚未进行基准测试的中型和大型数据集</li>
</ul>
</li>
<li><p><strong>注意:</strong><code>dask</code>测试利用<code>pandas.from_csv</code>内部<code>dask.delayed()</code>调用来获得最大的时间减少</p>
</li>
</ul>