用islice和多处理技术批量读取和处理大型文本文件

2024-06-25 23:20:52 发布

您现在位置:Python中文网/ 问答频道 /正文

代码不会返回任何东西,它会一直运行。请帮助处理代码段。仅供参考:我是第一次使用multiprocessing。你知道吗

我的本地内存很低,因此从zip文件中提取数据。我的想法是使用islice一次读取n行,然后使用process_logBatch()处理它们。你知道吗

在windows机器上运行此代码-Jupyter笔记本。你知道吗

import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd  # Unused.

def process_logBatch(next_n_lines):
    l = [random.randint(0,100) for i in range(5)]
    print(l)
    return l

def collect_results(result):
    results.extend(result)

pool = mp.Pool(processes=(mp.cpu_count()-1))

results = []

with zipfile.ZipFile('log.zip', 'r') as z:
    with z.open('log.txt') as f:

        while True:
            print(f.closed)
            next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]

            if not next_n_lines:
                break

            try:
                pool.apply_async(process_logBatch, args=(next_n_lines, ), callback=collect_results)
            except Exception as e:
                print(e)

            if counter == 2:
                break
        pool.close()
        pool.join()

print(results)


Tags: 代码importasmpzipmultiprocessingprocessresults
1条回答
网友
1楼 · 发布于 2024-06-25 23:20:52

有几个问题。一种是在Windows上,您需要一个if __name__ == '__main__':语句来保护主模块,如multiprocessing模块的documentation中标题为“主模块的安全导入”一节所示。你知道吗

然而,第二件事并不是那么容易解决的。每个进程都在自己的内存空间中运行,因此它们并不都有相同的results列表。为了避免这种情况,我转而使用Pool.map_async(),并在所有子进程结束时收集结果。你知道吗

以下是我认为可行的方法(基于您的示例代码):

import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd  # Unused.
import random  # Added.

def process_logBatch(next_n_lines):
    l = [random.randint(0,100) for i in range(5)]
    print(l)
    return l

if __name__ == '__main__':

# Not longer needed.
#    def collect_results(result):
#        results.extend(result)

    pool = mp.Pool(processes=(mp.cpu_count()-1))

    with zipfile.ZipFile('log.zip', 'r') as z:
        with z.open('log.txt') as f:

            counter = 0  # Added to avoid NameError because undefined.

            while True:
                next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]

                if not next_n_lines:
                    break

                try:
                    results = pool.map_async(process_logBatch, next_n_lines)
                except Exception as e:
                    print(e)

                if counter == 2:
                    break

            pool.close()
            pool.join()

    print(results.get())

相关问题 更多 >