将数据从子进程传输到线程进程

2024-09-28 20:55:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我想使用subprocess(和threading)模块:

  1. 调用将二进制文件处理为可读输出的程序
  2. 处理子进程的输出
  3. 两次重复同时进行1和2次(如果可能)

因为这是核苷酸分辨率的基因组尺度数据,取决于基因组大小。对于真实的人工数据,将子进程输出写入临时文件可能需要500GB的空间,如果可能的话,我希望避免这种情况。你知道吗

我尝试了两种策略,一种是将子进程输出保存在临时文件中,另一种是使用stdout=子流程管道Popen选项。无论哪种方式,这都是我的速率限制步骤,当我使用真实数据集时,它会变得非常受限。你知道吗

对于仅使用子流程模块的临时文件:

SC_paste_rep1 = subprocess.Popen(["bwtool","paste","typeA_rep1","typeB_rep1","-o=tmp_file_rep1.bg"])
SC_pasteM_rep2 = subprocess.Popen(["bwtool","paste","typeA_rep2","typeB_rep2","-o=tmp_file_rep2.bg"])

SC_paste_rep1.wait()
SC_paste_rep2.wait()

然后我一个接一个地遍历每个temp文件,并处理这些行(使用下面代码块中类似于f()的内容)。当我在测试集上计时脚本时,我得到:
实际7m9.100s
用户8m43.162s
系统0m21.572s

我的第二次尝试避免使用temp文件,并利用子进程和线程:

def struct_bind(input_pipe, output_pipe, calc_structScore):
        def f():
            try:
                DS_old_val = ''
                SS_old_val = ''
                old_chr = ''
                start_set = ''
                stop_set = ''
                for line in iter(input_pipe.readline, ''):
                    (out_line,start_set,stop_set,old_chr,DS_old_val,SS_old_val) = calc_structScore(line,start_set,stop_set,old_chr,DS_old_val,SS_old_val)
                    if out_line != "hold":
                        output_pipe.write(out_line)
            finally:
                try:
                    output_pipe.close()
                finally:
                    input_pipe.close()
        t = threading.Thread(target=f)
        t.daemon = True
        t.start()
        return t

output_rep1_open = open("output_rep1.bw",'w')
output_rep2_open = open("output_rep2.bw",'w')

SC_paste_rep1 = subprocess.Popen(["bwtool","paste","typeA_rep1","typeB_rep1"],stdout=subprocess.PIPE,bufsize=-1)
SC_mod_rep1 = struct_bind(SC_paste_rep1.stdout,output_rep1_open,calc_structScore)

SC_paste_rep2 = subprocess.Popen(["bwtool","paste","typeA_rep2","typeB_rep2"],stdout=subprocess.PIPE,bufsize=-1)
SC_mod_rep2 = struct_bind(SC_paste_rep2.stdout,output_rep2_open,calc_structScore)

SC_mod_rep1.join()
SC_mod_rep2.join()
SC_paste_rep1.wait()
SC_paste_rep2.wait()

当我用bufsize=-1计时时:
实11m59.916s
用户15.790s
系统6m4.131s

bufsize=0:
实际25m11.939s
用户26m11.768s
系统25m46.414s


在实际数据集上,使用临时文件可以获得:
实际151m3.040s
用户172m4.584s
系统8m33.428s

如果没有我可以得到:
实际248m52.712s
用户301m30.378s
系统151m23.694s

我的管道策略使使用临时文件的速度提高了2倍,这有什么问题吗?我可以做些什么来改进这一点?考虑到我一个接一个地处理2个临时文件,使用临时文件时的速度提高尤其令人惊讶。你知道吗


Tags: 用户output系统stdoutvalopenoldpaste