多线程同时读取和处理一个巨大的文件(内存太大)

2024-09-30 07:27:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有下面的代码运行非常慢。这是一个程序,分割一个大文件(80千兆位)并将其放入一个树型文件夹结构中,以便快速查找。为了帮助您理解代码,我在代码中做了一些注释。在

# Libraries
import os


# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth


# Preperations
os.makedirs(outputdirectory)

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()


# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
    for line in infile:
        pipeline(line)

有没有办法让多线程工作?因为我自己在网上找到了几个例子,它把所有的东西都放在内存里,导致我的电脑多次冻结。在


Tags: andthepathinforisoswith
1条回答
网友
1楼 · 发布于 2024-09-30 07:27:40

首先,(IMO)最简单的解决方案

如果行是完全独立的,只需将文件分成N个块,将文件名传递给open作为程序参数,并在多个命令行上手动启动当前脚本的多个实例。在

优点:

  • 无需钻研多处理、进程间通信等
  • 不需要对代码做太多修改

缺点:

  • 您需要对大文件进行预处理,并将其拆分为块(尽管这将比当前的执行时间快得多,因为您不会有每行一个打开-关闭的场景)
  • 您需要自己启动进程,为每个进程传递适当的文件名

这将实施为:

预处理:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
  chunk_id = 0
  next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
  while next_chunk:
    with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
      ofp.writelines(next_chunk)
    chunk_id += 1
    next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

^{} docs

If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.

这样做并不能保证所有块中的行数都是偶数,但会使预处理快得多,因为您是按块读取而不是逐行读取。根据需要调整块大小。 另外,请注意,通过使用readlines我们可以确保在块之间不会有行被打断,但是由于函数返回一个行列表,所以我们使用writelines将其写入输出文件(这相当于对列表和ofp.write(line)进行循环)。为了完整起见,让我注意到,您还可以连接内存中的所有字符串并只调用write一次(即,do ofp.write(''.join(next_chunk))),这可能会给您带来一些(小的)性能优势,带来(大大)更高的RAM使用率。在

主脚本:

您只需要在顶部进行修改:

^{pr2}$

通过使用argv可以将命令行参数传递给程序(在本例中,是要打开的文件)。然后,只需将脚本运行为:

python进程_文件.py大_文件_0.txt

这将运行一个进程。打开多个终端并对每个终端运行相同的命令big_file_N.txt,它们将彼此独立。在

注意:我使用argv[1],因为对于所有程序,argv(即,argv[0])的第一个值总是程序名。在


然后,multiprocessing

第一个解决方案虽然有效,但并不十分优雅,特别是如果从80GB大小的文件开始,将有80个文件。在

一个更干净的解决方案是使用python的multiprocessing模块(重要提示:不要threading!如果您不知道两者的区别,请查找“全局解释器锁”以及为什么python中的多线程处理不能像您想象的那样工作)。在

我们的想法是有一个“producer”进程来打开大文件并不断地将其中的行放入队列中。然后,一个“消费者”进程池从队列中提取行并进行处理。在

优点:

  • 一个剧本就能做到一切
  • 不需要打开多个终端和打字

缺点:

  • 复杂性
  • 使用进程间通信,这有一些开销

具体实施如下:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()

if __name__ == '__main__':
    # Variables
    file="80_gig_file.txt"

    # Preperations
    os.makedirs(outputdirectory)
    pool = multiprocessing.Pool() # by default, 1 process per CPU
    LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

    with open(file) as infile:
        next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
        pool.close()
        pool.join()

if __name__ == '__main__'行是将运行在每个进程上的代码与只在“父进程”上运行的代码分开的障碍。每个进程都定义pipeline,但只有父进程实际生成一个worker池并应用该函数。您可以找到有关multiprocessing.maphere的更多详细信息

编辑:

添加了关闭和加入池,以防止主进程退出并杀死进程中的子进程。在

相关问题 更多 >

    热门问题