无返回函数的Python I/O多处理

2024-09-30 22:11:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个正在工作的python脚本,它的工作原理如下:

open("A", 'r')
open("B", 'r')
open("C", 'w')
for lineA in A:
    part1, part2, part3 = lineA.split(' ')
    for lineB in B:
        if part2 in lineB:
            C.write(lineB)

如果文件a的某一行存在,我想签入文件B。如果是这样的话,把文件B中的整行写在新的文件C中

这个过程以我设计它的方式有点耗时(1-我仍然认为自己是Python的noob,2-至少有4个IF语句在主FOR循环中运行),现在我已经开始使用比以前大200倍的输入文件,所以这里每个输入文件的时间大约是5个小时。在

我试过使用多重处理,但似乎无法使其发挥作用。 最初,我在main()函数中尝试了一个简单的代码,没有任何显著的改进,而且绝对没有使用多个CPU:

^{pr2}$

然后我尝试了乔布斯的方法:

jobs = []
for i in range(4):
    p = Process(target='myfunc')
    jobs.append(p)
    p.start()
    p.join()

我在论坛上找到了一个pool示例,我在main函数中添加了Return语句:

def multiproc(arg1,arg2,arg3):
    (...)
    return lineB   # example of Return statment

def main():
    pool = Pool(4)
    with open('file.txt', 'w') as map_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(multi_thread, map_file, 4)

if __name__ == "__main__":
    main()

jobs方法实际上创建了文件,然后从零开始重新启动了3次整个过程。最后一个错误是:

io.UnsupportedOperation: not readable

我还假设我的返回语句打破了我的循环。。。 有什么建议可以为这段代码启用多处理,或者提高它的整洁性?在

谢谢!在

编辑: 按要求,这里是完整的混乱代码:

#!/usr/bin/python3
__author__ = 'daniel'

import os
import re
from multiprocessing import Process
from multiprocessing import Pool
import time
start_time = time.time()

def multi_thread(filePath, datasetFolder, mapFileDataset):
    fout = open('outdude.txt', 'w')
    cwd = os.getcwd()
    cwdgen, sep, id = filePath.rpartition('/')
    dataset = datasetFolder.rsplit("/",1)
    dataset = dataset[1]
    ## Create file
    for i in os.listdir(cwd):
        if ".ped" in i:
            sample_id, sep, rest = i.partition('.ped')
            for i in os.listdir(cwd):
                if sample_id+'.pileupgatk' in i and dataset in i:
                    pileup4map = open(i,'r')
                    snpcounter = sum(1 for _ in pileup4map)-1
                    pileup4map.seek(0)
                    mapout = open(sample_id+'.map', 'w')
                    counter = 1
                    for line in pileup4map:
                        if counter <= snpcounter:
                            mapFileData = open(datasetFolder+'/'+mapFileDataset,'r')
                            line = line.rstrip()
                            chro, coord, refb, rbase, qual = line.split(' ')
                            chrom = chro.strip("chr")
                            counter+=1
                            for ligna in mapFileData:
                                if coord in ligna:
                                    k = re.compile(r'(?=%s )' % coord, re.I)   
                                    lookAhead = k.search(ligna)
                                    k = re.compile(r'(?<= %s)' % coord, re.I)   
                                    lookBehind = k.search(ligna)
                                    if lookAhead and lookBehind != None:
                                        lignaChrom = ligna[:2].rstrip(' ')
                                        if chrom == lignaChrom:
                                            lignaOut = ligna.rstrip()
                                            mapout.write(lignaOut+'\n')
                                            ## For POOL
                                            return lignaOut
                                        else: 
                                            pass
                                    else:
                                        pass
                        else:
                            pass
                    mapout.close()


def main():
    #Multiproc
    # p = Process(target=multi_thread, args=('/home/full_karyo.fa', '/home/haak15', 'dataPP.map'))
    # p.start()
    # p.join()
    # print("--- %s seconds ---" % (time.time() - start_time))

    #Jobs
    # jobs = []
    # for i in range(4):
    #     p = Process(target=multi_thread, args=('/home/full_karyo.fa', '/home/haak15', 'dataPP.map'))
    #     jobs.append(p)
    #     p.start()
    #     p.join()

    #Pool
    pool = Pool(4)
    with open('file.txt', 'w') as map_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(multi_thread, map_file, 4)
        print(results)
    print("--- %s seconds ---" % (time.time() - start_time))


if __name__ == "__main__":
    main()

编辑2: 按照roberte和bigc的建议,我重新编写了代码,现在它的速度提高了13倍,而且不再那么令人困惑。正如bigc所指出的,我使用的字典方法不像前一个那样需要I/O。我对速度很满意,所以我暂时不考虑多处理。谢谢你的评论!在

if makemap == True:
    ## Dictionary method - 13X faster
    for i in os.listdir(cwd):
        if ".ped" in i:
            sample_id, sep, rest = i.partition('.ped')
            for i in os.listdir(cwd):
                if sample_id+'.pileupgatk' in i and dataset in i:
                    print("\n\t> Creating MAP file from sample: "+sample_id)
                    pileup4map = open(i,'r')
                    snpcounter = sum(1 for _ in pileup4map)-1
                    pileup4map.seek(0)
                    counter = 1
                    piledic = {}
                    for line in pileup4map:
                        if counter <= snpcounter:
                            line = line.rstrip()
                            #chr21 43805965 G G G
                            chro, coord, refb, rbase, qual = line.split(' ')
                            chrom = chro.strip("chr")
                            piledic[chrom,coord]=int(counter)
                            counter += 1

                    pileup4map.close()
                    mapFileData = open(datasetFolder+'/'+mapFileDataset,'r')
                    mapDic = {}
                    counterM =1
                    for ligna in mapFileData:
                        #22 Affx-19821577     0.737773     50950707 A G
                        chroMap,ident,prob,posMap,bas1,bas2 = ligna.split()
                        mapDic[chroMap,posMap]=int(counterM)
                        counterM +=1

                    listOfmatches = []
                    for item in piledic:
                        if item in mapDic:
                            listOfmatches.append(mapDic[item])
                    listOfmatches.sort()

                    mapWrite = open(sample_id+".map", 'w')
                    mapFileData.seek(0)
                    lineCounter = 1
                    for lignagain in mapFileData:
                        if lineCounter in listOfmatches:
                            mapWrite.write(lignagain)
                        lineCounter +=1
                    mapWrite.close()
                    mapFileData.close()

Tags: 文件sampleinidmapforiftime