<p>你的代码是非常低效的,因为它打开并附加了它所处理的输入文件的每一行的数据,如果输入文件那么大的话,这将是非常多的次数(因为这样做所需要的操作系统调用相对较慢)。你知道吗</p>
<p>另外,我注意到你的代码中至少有一个bug,那就是:</p>
<pre><code>save_path += cik + ".csv"
</code></pre>
<p>它只会让<code>save_path</code>越来越长…不是需要的。你知道吗</p>
<p>不管怎样,这里有一个工作速度应该更快的方法,尽管处理这么大的文件可能还需要相当长的时间。它通过缓存中间结果来加快进程。它只打开不同的输出csv文件,并尽可能不频繁地创建它们相应的<code>csv.writer</code>对象,第一次需要它们,并且只有在它们因为缓存达到其最大长度而关闭时才需要它们。你知道吗</p>
<p>请注意,缓存可能会消耗大量内存,这取决于有多少个唯一的csv输出文件以及可以同时打开多少个文件,但使用大量内存是它运行速度更快的原因。您需要四处玩,手动调整<code>MAX_OPEN</code>值,以找到速度和内存使用之间的最佳折衷,同时保持在操作系统允许一次打开多少文件的限制之下。你知道吗</p>
<p>还要注意的是,通过更智能地选择要关闭的现有文件条目,而不是随机选择一个(打开的)条目,可以使它的工作更加高效。然而,这样做是否真的有用取决于输入文件中的数据是否有任何有利的分组或其他顺序。你知道吗</p>
<pre><code>import csv
import os
import random
class CSVWriterCache(dict):
""" Dict subclass to cache pairs of csv files and associated
csv.writers. When a specified maximum number of them already
exist, a random one closed, but an entry for it is retained
and marked "closed" so it can be re-opened in append mode
later if it's ever referenced again. This limits the number of
files open at any given time.
"""
_CLOSED = None # Marker to indicate that file has seen before.
def __init__(self, max_open, **kwargs):
self.max_open = max_open
self.cur_open = 0 # Number of currently opened csv files.
self.csv_kwargs = kwargs # keyword args for csv.writer.
# Adding the next two non-dict special methods makes the class a
# context manager which allows it to be used in "with" statements
# to do automatic clean-up.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __getitem__(self, k):
if k not in self:
return self.__missing__(k)
else:
try:
csv_writer, csv_file = self.get(k)
except TypeError: # Needs to be re-opened in append mode.
csv_file = open(k, 'a', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
return csv_writer, csv_file
def __missing__(self, csv_file_path):
""" Create a csv.writer corresponding to the file path and add it
and the file to the cache.
"""
if self.cur_open == self.max_open: # Limit?
# Randomly choose a cached entry with a previously seen
# file path that is still open (not _CLOSED). The associated
# file is then closed, but the entry for the file path is
# left in the dictionary so it can be recognized as having
# been seen before and be re-opened in append mode.
while True:
rand_entry = random.choice(tuple(self.keys()))
if self[rand_entry] is not self._CLOSED:
break
csv_writer, csv_file = self[rand_entry]
csv_file.close()
self.cur_open -= 1
self[rand_entry] = self._CLOSED # Mark as previous seen but closed.
csv_file = open(csv_file_path, 'w', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
self.cur_open += 1
# Add pair to cache.
super().__setitem__(csv_file_path, (csv_writer, csv_file))
return csv_writer, csv_file
# Added, non-standard dict method.
def close(self):
""" Close all the opened files in the cache and clear it out. """
for key, entry in self.items():
if entry is not self._CLOSED:
entry[1].close()
self[key] = self._CLOSED # Not strictly necessary.
self.cur_open -= 1 # For sanity check at end.
self.clear()
assert(self.cur_open == 0) # Sanity check.
if __name__ == '__main__':
file_path = "./master.tsv"
save_path = "./data-sorted"
MAX_OPEN = 1000 # Number of opened files allowed (max is OS-dependent).
# MAX_OPEN = 2 # Use small value for testing.
# Create output directory if it does not exist.
if os.path.exists(save_path):
if not os.path.isdir(save_path):
raise RuntimeError("Path {!r} exists, but isn't a directory")
else:
print('Creating directory: {!r}'.format(save_path))
os.makedirs(save_path)
# Process the input file using a cache of csv.writers.
with open(file_path, 'r') as masterfile, \
CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
for line in masterfile:
line_split = line.rstrip().split("|")
cik = line_split[0].zfill(10)
save_file_path = os.path.join(save_path, cik + ".csv")
writer = csv_writer_cache[save_file_path][0]
writer.writerow(line_split)
print('{!r} file processing completed'.format(os.path.basename(file_path)))
</code></pre>