我创建了一个默认情况下创建一个多处理进程的脚本;然后它可以正常工作。当启动多个进程时,它开始挂起,而不是总是在同一个地方。这个程序大约有700行代码,所以我将试着总结一下发生了什么。我想最大限度地利用我的多核,把最慢的任务并行化,也就是对齐DNA序列。为此,我使用subprocess模块调用一个命令行程序:“hmmsearch”,我可以通过/dev/stdin输入序列,然后通过/dev/stdout读取对齐的序列。我想挂起是因为这些从stdout/stdin读/写的多个子进程实例,我真的不知道解决这个问题的最佳方法。。。 我在调查os.fdopen操作系统(…)&;os.tmp文件或创建临时文件,以通过管道()刷新数据。但是,我以前从未使用过这两种方法,我无法想象如何使用subprocess模块来实现这一点。理想情况下,我希望完全不使用硬盘驱动器,因为管道在高吞吐量数据处理方面更好! 任何帮助都将是超级美妙的!!在
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
经过一段时间的调试,我发现了一个问题,这个问题一直存在,而且还没有完全解决,但是在调试过程中还解决了其他一些效率低下的问题。 有两个初始的feeder函数,这个align_-seq类和一个文件解析器parseHMM()它将位置特定的评分矩阵(PSM)加载到字典中。 然后,主父进程将对齐与PSM进行比较,使用一个字典(一个字典)作为指向每个余数的相关分数的指针。为了计算分数,我想我有两个分开的多处理过程类,一个classlogScore()计算对数优势比(使用数学实验());我将这一个并行化;它将计算出的分数排队到最后一个进程,sumScore()它只是将这些分数相加(使用数学.fsum), 将总和和所有职位特定的分数作为字典返回父进程。 即 队列.put([求和,{剩余位置:位置特定分数。。。} ] ) 我觉得这让我很困惑(排队太多了!),所以我希望读者们能够理解。。。在完成以上所有计算之后,我给出了将累积分数保存为制表符分隔的输出的选项。这就是它现在(从昨晚开始)有时会中断的地方,因为我保证它会打印出每个应该有分数的位置的分数。我认为,由于延迟(计算机计时不同步),有时先放入队列的logScore的内容不会首先到达sumScore。 为了让sumScore知道何时返回计数并重新开始,我将'endSEQ'放入执行计算的最后一个logScore进程的队列中。我想它也应该最后到达sumScore,但情况并非总是如此,只是有时会断裂。所以现在我不再遇到死锁,而是在打印或保存结果时出现一个KeyError。 我相信有时候获取keyror的原因是因为我为每个logScore进程创建了一个队列,但是它们应该使用相同的队列。现在,我有东西在哪比如:在
^{pr2}$而我应该只创建一个队列在所有logScore实例之间共享。i、 e
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
我还想并行化简单的任务,为此我创建了一个小python脚本。你可以看看: http://bioinf.comav.upv.es/psubprocess/index.html
它比您想要的要通用一些,但对于简单的任务来说,使用起来非常容易。这至少和你有些关系。在
何塞·布兰卡
这不是流水线的工作原理。。。但是为了让你放松一下,这里有一段subprocess documentation的节选:
最有可能出现故障的地方是与主进程的通信或信号量的管理。可能状态转换/同步由于错误而没有按预期进行?我建议通过在每次阻塞调用之前和之后添加日志/打印语句来进行调试—在这里您与主进程通信,在那里获取/释放信号量,以缩小出错的范围。在
我也很好奇-信号量是绝对必要的吗?在
它可能是子进程中的死锁,您是否尝试过使用通信方法而不是等待? http://docs.python.org/library/subprocess.html
相关问题 更多 >
编程相关推荐