文件较大时CSV文件出现问题

2024-09-15 16:25:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个大的tsv文件(~2.5Gb)。我遍历每一行,其中该行有6个选项卡。我获取每行的第一个选项卡,并将该行附加到基于第一个选项卡的csv文件中。目标是以基于主tsv行的csv文件排序的文件结束。你知道吗

这适用于小规模的文件,但当我在大文件上运行时,IPython控制台永远不会结束。我要保存到的文件看起来好像正在被填充,但当我打开它时,什么也不显示。你知道吗

import csv

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for line in masterfile:
        line_split = line.split("|")
        cik = line_split[0].zfill(10)

        save_path = ".../data-sorted/"
        save_path += cik + ".csv"

        with open(save_path, 'a') as savefile:
            wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
            wr.writerow(line_split)

Tags: 文件csvpathtsvsaveaswithline
2条回答

假设你有足够的内存,你最好在内存中对文件进行排序,比如把它放到字典里,然后一次把它写到磁盘上。如果I/O确实是您的瓶颈,那么您只需打开一次输出文件就可以获得很多好处。你知道吗

from collections import defaultdict
from os.path import join

file_path = ".../master.tsv"

data = collections.defaultdict(list)
with open(file_path, 'r') as masterfile:
    for line in masterfile:
        cik = line.split("|", 1)[0].zfill(10)
        data[cik].append(line)

for cik, lines in data.items():
    save_path = join(".../data-sorted", cik + ".csv")

    with open(save_path, 'w') as savefile:
        wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
        for line in lines:
            wr.writerow(line.split("|"))

您可能没有足够的内存来加载整个文件。在这种情况下,您可以将其转储为块,如果块足够大,最终仍会为您节省大量的I/O。下面的分块方法非常快速和肮脏。你知道吗

from collections import defaultdict
from itertools import groupby
from os.path import join

chunk_size = 10000  # units of lines

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for _, chunk in groupby(enumerate(masterfile),
                            key=lambda item: item[0] // chunk_size):
        data = defaultdict(list)
        for line in chunk:
            cik = line.split("|", 1)[0].zfill(10)
            data[cik].append(line)
        for cik, lines in data.items():
            save_path = join(".../data-sorted", cik + ".csv")

            with open(save_path, 'a') as savefile:
                wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
                for line in lines:
                    wr.writerow(line.split("|"))

你的代码是非常低效的,因为它打开并附加了它所处理的输入文件的每一行的数据,如果输入文件那么大的话,这将是非常多的次数(因为这样做所需要的操作系统调用相对较慢)。你知道吗

另外,我注意到你的代码中至少有一个bug,那就是:

save_path += cik + ".csv"

它只会让save_path越来越长…不是需要的。你知道吗

不管怎样,这里有一个工作速度应该更快的方法,尽管处理这么大的文件可能还需要相当长的时间。它通过缓存中间结果来加快进程。它只打开不同的输出csv文件,并尽可能不频繁地创建它们相应的csv.writer对象,第一次需要它们,并且只有在它们因为缓存达到其最大长度而关闭时才需要它们。你知道吗

请注意,缓存可能会消耗大量内存,这取决于有多少个唯一的csv输出文件以及可以同时打开多少个文件,但使用大量内存是它运行速度更快的原因。您需要四处玩,手动调整MAX_OPEN值,以找到速度和内存使用之间的最佳折衷,同时保持在操作系统允许一次打开多少文件的限制之下。你知道吗

还要注意的是,通过更智能地选择要关闭的现有文件条目,而不是随机选择一个(打开的)条目,可以使它的工作更加高效。然而,这样做是否真的有用取决于输入文件中的数据是否有任何有利的分组或其他顺序。你知道吗

import csv
import os
import random

class CSVWriterCache(dict):
    """ Dict subclass to cache pairs of csv files and associated
        csv.writers. When a specified maximum number of them already
        exist, a random one closed, but an entry for it is retained
        and marked "closed" so it can be re-opened in append mode
        later if it's ever referenced again. This limits the number of
        files open at any given time.
    """
    _CLOSED = None  # Marker to indicate that file has seen before.

    def __init__(self, max_open, **kwargs):
        self.max_open = max_open
        self.cur_open = 0  # Number of currently opened csv files.
        self.csv_kwargs = kwargs  # keyword args for csv.writer.

    # Adding the next two non-dict special methods makes the class a
    # context manager which allows it to be used in "with" statements
    # to do automatic clean-up.
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __getitem__(self, k):
        if k not in self:
            return self.__missing__(k)
        else:
            try:
                csv_writer, csv_file = self.get(k)
            except TypeError:  # Needs to be re-opened in append mode.
                csv_file = open(k, 'a', newline='')
                csv_writer = csv.writer(csv_file, **self.csv_kwargs)

            return csv_writer, csv_file

    def __missing__(self, csv_file_path):
        """ Create a csv.writer corresponding to the file path and add it
            and the file to the cache.
        """
        if self.cur_open == self.max_open:  # Limit?
            # Randomly choose a cached entry with a previously seen
            # file path that is still open (not _CLOSED). The associated
            # file is then closed, but the entry for the file path is
            # left in the dictionary so it can be recognized as having
            # been seen before and be re-opened in append mode.
            while True:
                rand_entry = random.choice(tuple(self.keys()))
                if self[rand_entry] is not self._CLOSED:
                    break
            csv_writer, csv_file = self[rand_entry]
            csv_file.close()
            self.cur_open -= 1
            self[rand_entry] = self._CLOSED  # Mark as previous seen but closed.

        csv_file = open(csv_file_path, 'w', newline='')
        csv_writer = csv.writer(csv_file, **self.csv_kwargs)
        self.cur_open += 1

        # Add pair to cache.
        super().__setitem__(csv_file_path, (csv_writer, csv_file))
        return csv_writer, csv_file

    # Added, non-standard dict method.
    def close(self):
        """ Close all the opened files in the cache and clear it out. """
        for key, entry in self.items():
            if entry is not self._CLOSED:
                entry[1].close()
                self[key] = self._CLOSED  # Not strictly necessary.
                self.cur_open -= 1  # For sanity check at end.
        self.clear()
        assert(self.cur_open == 0)  # Sanity check.

if __name__ == '__main__':
    file_path = "./master.tsv"
    save_path = "./data-sorted"
    MAX_OPEN  = 1000  # Number of opened files allowed (max is OS-dependent).
#    MAX_OPEN  = 2  # Use small value for testing.

    # Create output directory if it does not exist.
    if os.path.exists(save_path):
        if not os.path.isdir(save_path):
            raise RuntimeError("Path {!r} exists, but isn't a directory")
    else:
        print('Creating directory: {!r}'.format(save_path))
        os.makedirs(save_path)

    # Process the input file using a cache of csv.writers.
    with open(file_path, 'r') as masterfile, \
         CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
        for line in masterfile:
            line_split = line.rstrip().split("|")
            cik = line_split[0].zfill(10)

            save_file_path = os.path.join(save_path, cik + ".csv")
            writer = csv_writer_cache[save_file_path][0]
            writer.writerow(line_split)

    print('{!r} file processing completed'.format(os.path.basename(file_path)))

相关问题 更多 >