如何根据一个特定列的值将一个大文件分成小的块来进行多处理、多线程处理?

2024-09-30 22:26:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我为生物过程https://codereview.stackexchange.com/questions/186396/solve-the-phase-state-between-two-haplotype-blocks-using-markov-transition-proba编写了一个python程序。在

如果你仔细研究这个程序,你会发现这个程序一次从两个连续的行(或键,VAL)计算数据时要花很多时间。我并没有把整个代码放在这里,但为了简单起见,我创建了一个模拟文件和模拟程序(如下所示),其在最简单的级别上的行为类似。在这个模拟程序中,我在计算len(vals)列,并将其写回输出文件。在

由于在原始程序(上面的链接)中执行for (k1, v1) and (k2, v2) ....时,计算受到CPU/GPU的限制,我希望通过-1)以尽可能快的方式读取内存中的整个数据2)通过唯一的chr字段3)将数据分成块进行计算4)将其写回文件中。那么,我该怎么做呢?

在给定的模拟文件中,计算太简单了,无法绑定到GPU/CPU,但我只想知道如果需要的话,我可以怎么做。在

注意:有太多人问我要实现什么-我正在尝试多处理/线程给定的问题。如果我把我最初的整个大程序放在这里,没人会去看它。所以,让我们来练习这个小文件和小python程序。在

以下是我的代码和数据:

my_data = '''chr\tpos\tidx\tvals
2\t23\t4\tabcd
2\t25\t7\tatg
2\t29\t8\tct
2\t35\t1\txylfz
3\t37\t2\tmnost
3\t39\t3\tpqr
3\t41\t6\trtuv
3\t45\t5\tlfghef
3\t39\t3\tpqr
3\t41\t6\trtu
3\t45\t5\tlfggg
4\t25\t3\tpqrp
4\t32\t6\trtu
4\t38\t5\tlfgh
4\t51\t3\tpqr
4\t57\t6\trtus
'''


def manipulate_lines(vals):
    vals_len = len(vals[3])
    return write_to_file(vals[0:3], vals_len)

def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('\t'.join(['\t'.join(a), str(b), '\n']))
    to_file.close()

def main():
    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n']))
    to_file.close()

    data = my_data.rstrip('\n').split('\n')


    for lines in data:
        if lines.startswith('chr'):
            continue
        else:
            lines = lines.split('\t')
        manipulate_lines(lines)


if __name__ == '__main__':
    main()

Tags: 文件to数据程序datalenfilewrite
2条回答

我只使用过几次线程,下面没有测试过这段代码,但是快速浏览一下,for循环确实是唯一可以从线程中获益的地方。在

不过,我还是让别人来决定吧。在

import threading

my_data = '''chr\tpos\tidx\tvals
2\t23\t4\tabcd
2\t25\t7\tatg
2\t29\t8\tct
2\t35\t1\txylfz
3\t37\t2\tmnost
3\t39\t3\tpqr
3\t41\t6\trtuv
3\t45\t5\tlfghef
3\t39\t3\tpqr
3\t41\t6\trtu
3\t45\t5\tlfggg
4\t25\t3\tpqrp
4\t32\t6\trtu
4\t38\t5\tlfgh
4\t51\t3\tpqr
4\t57\t6\trtus
'''


def manipulate_lines(vals):
    vals_len = len(vals[3])
    return write_to_file(vals[0:3], vals_len)

def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('\t'.join(['\t'.join(a), str(b), '\n']))
    to_file.close()

def main():
    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n']))
    to_file.close()

    data = my_data.rstrip('\n').split('\n')

    for lines in data:
        if not lines.startswith('chr'):
            lines = lines.split('\t')
        threading.Thread(target = manipulate_lines, args = (lines)).start()


if __name__ == '__main__':
    main()

使用多个进程处理数据时,要处理的一个问题是保持顺序。Python提出了一种相当好的处理方法,使用multiprocessing.Pool,它可以用来map对输入数据进行处理。这将负责按顺序返回结果。在

但是,处理可能仍然是无序的,因此要正确使用它,在子进程中只应运行处理,而不应运行IO访问。因此,要在您的案例中使用此方法,需要对代码执行一个小的重写,使所有IO操作都在主进程中进行:

from multiprocessing import Pool
from time import sleep
from random import randint

my_data = '''chr\tpos\tidx\tvals
2\t23\t4\tabcd
2\t25\t7\tatg
2\t29\t8\tct
2\t35\t1\txylfz
3\t37\t2\tmnost
3\t39\t3\tpqr
3\t41\t6\trtuv
3\t45\t5\tlfghef
3\t39\t3\tpqr
3\t41\t6\trtu
3\t45\t5\tlfggg
4\t25\t3\tpqrp
4\t32\t6\trtu
4\t38\t5\tlfgh
4\t51\t3\tpqr
4\t57\t6\trtus
'''

def manipulate_lines(vals):
    sleep(randint(0, 2))
    vals_len = len(vals[3])
    return vals[0:3], vals_len

def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('\t'.join(['\t'.join(a), str(b), '\n']))
    to_file.close()

def line_generator(data):
    for line in data:
        if line.startswith('chr'):
            continue
        else:
           yield line.split('\t')

def main():
    p = Pool(5)

    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('\t'.join(['chr', 'pos', 'idx', 'vals', '\n']))
    to_file.close()

    data = my_data.rstrip('\n').split('\n')

    lines = line_generator(data)
    results = p.map(manipulate_lines, lines)

    for result in results:
        write_to_file(*result)

if __name__ == '__main__':
    main()

此程序不会在列表的不同chr值之后拆分列表,而是直接从最大值为5(参数为Pool)的子进程中逐项处理列表。在

为了显示数据仍按预期顺序排列,我向manipulate_lines函数添加了一个随机睡眠延迟。这说明了这个概念,但可能无法给出加速的正确观点,因为休眠进程允许另一个进程并行运行,而计算量大的进程将在其所有运行时间内使用CPU。在

可以看出,一旦map调用返回,就必须完成对文件的写入,这将确保所有子进程都已终止并返回其结果。在场景后面进行这种通信会有相当多的开销,因此要使其受益,计算部分必须比写入阶段长得多,并且不能生成太多的数据来写入文件。在

此外,我还打破了生成器中的for循环。这样就可以根据请求使用multiprocessing.Pool的输入。另一种方法是预处理data列表,然后将该列表直接传递给Pool。不过,我发现生成器解决方案更好,而且峰值内存消耗更小。在

另外,关于多线程与多处理的评论;只要你计算大量的操作,你就应该使用多处理,这至少在理论上允许进程在不同的机器上运行。另外,在使用最多的Python实现中,线程遇到了另一个问题,即全局解释器锁(global interpreter lock,GIL),这意味着一次只能执行一个线程,因为解释器会阻止所有其他线程的访问。(也有一些例外,例如使用C语言编写的模块时,如numpy。在这些情况下,GIL可以在进行numpy计算时释放,但一般情况下不是这样的。)因此,线程主要用于程序被困在等待缓慢、无序的IO的情况。(插座、终端输入等)

相关问题 更多 >