"Python多进程,每个进程拥有自己的子进程(Kubuntu,Mac)"

2024-09-28 22:23:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我创建了一个默认情况下创建一个多处理进程的脚本;然后它可以正常工作。当启动多个进程时,它开始挂起,而不是总是在同一个地方。这个程序大约有700行代码,所以我将试着总结一下发生了什么。我想最大限度地利用我的多核,把最慢的任务并行化,也就是对齐DNA序列。为此,我使用subprocess模块调用一个命令行程序:“hmmsearch”,我可以通过/dev/stdin输入序列,然后通过/dev/stdout读取对齐的序列。我想挂起是因为这些从stdout/stdin读/写的多个子进程实例,我真的不知道解决这个问题的最佳方法。。。 我在调查os.fdopen操作系统(…)&;os.tmp文件或创建临时文件,以通过管道()刷新数据。但是,我以前从未使用过这两种方法,我无法想象如何使用subprocess模块来实现这一点。理想情况下,我希望完全不使用硬盘驱动器,因为管道在高吞吐量数据处理方面更好! 任何帮助都将是超级美妙的!!在

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

经过一段时间的调试,我发现了一个问题,这个问题一直存在,而且还没有完全解决,但是在调试过程中还解决了其他一些效率低下的问题。 有两个初始的feeder函数,这个align_-seq类和一个文件解析器parseHMM()它将位置特定的评分矩阵(PSM)加载到字典中。 然后,主父进程将对齐与PSM进行比较,使用一个字典(一个字典)作为指向每个余数的相关分数的指针。为了计算分数,我想我有两个分开的多处理过程类,一个classlogScore()计算对数优势比(使用数学实验());我将这一个并行化;它将计算出的分数排队到最后一个进程,sumScore()它只是将这些分数相加(使用数学.fsum), 将总和和所有职位特定的分数作为字典返回父进程。 即 队列.put([求和,{剩余位置:位置特定分数。。。} ] ) 我觉得这让我很困惑(排队太多了!),所以我希望读者们能够理解。。。在完成以上所有计算之后,我给出了将累积分数保存为制表符分隔的输出的选项。这就是它现在(从昨晚开始)有时会中断的地方,因为我保证它会打印出每个应该有分数的位置的分数。我认为,由于延迟(计算机计时不同步),有时先放入队列的logScore的内容不会首先到达sumScore。 为了让sumScore知道何时返回计数并重新开始,我将'endSEQ'放入执行计算的最后一个logScore进程的队列中。我想它也应该最后到达sumScore,但情况并非总是如此,只是有时会断裂。所以现在我不再遇到死锁,而是在打印或保存结果时出现一个KeyError。 我相信有时候获取keyror的原因是因为我为每个logScore进程创建了一个队列,但是它们应该使用相同的队列。现在,我有东西在哪比如:在

^{pr2}$

而我应该只创建一个队列在所有logScore实例之间共享。i、 e

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

Tags: toinself队列进程stdinstdoutmultiprocessing
3条回答

我还想并行化简单的任务,为此我创建了一个小python脚本。你可以看看: http://bioinf.comav.upv.es/psubprocess/index.html

它比您想要的要通用一些,但对于简单的任务来说,使用起来非常容易。这至少和你有些关系。在

何塞·布兰卡

这不是流水线的工作原理。。。但是为了让你放松一下,这里有一段subprocess documentation的节选:

stdin, stdout and stderr specify the executed programs’ standard input, standard output and standard error file handles, respectively. Valid values are PIPE, an existing file descriptor (a positive integer), an existing file object, and None. PIPE indicates that a new pipe to the child should be created. With None, no redirection will occur; the child’s file handles will be inherited from the parent.

最有可能出现故障的地方是与主进程的通信或信号量的管理。可能状态转换/同步由于错误而没有按预期进行?我建议通过在每次阻塞调用之前和之后添加日志/打印语句来进行调试—在这里您与主进程通信,在那里获取/释放信号量,以缩小出错的范围。在

我也很好奇-信号量是绝对必要的吗?在

它可能是子进程中的死锁,您是否尝试过使用通信方法而不是等待? http://docs.python.org/library/subprocess.html

相关问题 更多 >