我有一个正在工作的python脚本,它的工作原理如下:
open("A", 'r')
open("B", 'r')
open("C", 'w')
for lineA in A:
part1, part2, part3 = lineA.split(' ')
for lineB in B:
if part2 in lineB:
C.write(lineB)
如果文件a的某一行存在,我想签入文件B。如果是这样的话,把文件B中的整行写在新的文件C中
这个过程以我设计它的方式有点耗时(1-我仍然认为自己是Python的noob,2-至少有4个IF语句在主FOR循环中运行),现在我已经开始使用比以前大200倍的输入文件,所以这里每个输入文件的时间大约是5个小时。在
我试过使用多重处理,但似乎无法使其发挥作用。 最初,我在main()函数中尝试了一个简单的代码,没有任何显著的改进,而且绝对没有使用多个CPU:
^{pr2}$然后我尝试了乔布斯的方法:
jobs = []
for i in range(4):
p = Process(target='myfunc')
jobs.append(p)
p.start()
p.join()
我在论坛上找到了一个pool示例,我在main函数中添加了Return语句:
def multiproc(arg1,arg2,arg3):
(...)
return lineB # example of Return statment
def main():
pool = Pool(4)
with open('file.txt', 'w') as map_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(multi_thread, map_file, 4)
if __name__ == "__main__":
main()
jobs方法实际上创建了文件,然后从零开始重新启动了3次整个过程。最后一个错误是:
io.UnsupportedOperation: not readable
我还假设我的返回语句打破了我的循环。。。 有什么建议可以为这段代码启用多处理,或者提高它的整洁性?在
谢谢!在
编辑: 按要求,这里是完整的混乱代码:
#!/usr/bin/python3
__author__ = 'daniel'
import os
import re
from multiprocessing import Process
from multiprocessing import Pool
import time
start_time = time.time()
def multi_thread(filePath, datasetFolder, mapFileDataset):
fout = open('outdude.txt', 'w')
cwd = os.getcwd()
cwdgen, sep, id = filePath.rpartition('/')
dataset = datasetFolder.rsplit("/",1)
dataset = dataset[1]
## Create file
for i in os.listdir(cwd):
if ".ped" in i:
sample_id, sep, rest = i.partition('.ped')
for i in os.listdir(cwd):
if sample_id+'.pileupgatk' in i and dataset in i:
pileup4map = open(i,'r')
snpcounter = sum(1 for _ in pileup4map)-1
pileup4map.seek(0)
mapout = open(sample_id+'.map', 'w')
counter = 1
for line in pileup4map:
if counter <= snpcounter:
mapFileData = open(datasetFolder+'/'+mapFileDataset,'r')
line = line.rstrip()
chro, coord, refb, rbase, qual = line.split(' ')
chrom = chro.strip("chr")
counter+=1
for ligna in mapFileData:
if coord in ligna:
k = re.compile(r'(?=%s )' % coord, re.I)
lookAhead = k.search(ligna)
k = re.compile(r'(?<= %s)' % coord, re.I)
lookBehind = k.search(ligna)
if lookAhead and lookBehind != None:
lignaChrom = ligna[:2].rstrip(' ')
if chrom == lignaChrom:
lignaOut = ligna.rstrip()
mapout.write(lignaOut+'\n')
## For POOL
return lignaOut
else:
pass
else:
pass
else:
pass
mapout.close()
def main():
#Multiproc
# p = Process(target=multi_thread, args=('/home/full_karyo.fa', '/home/haak15', 'dataPP.map'))
# p.start()
# p.join()
# print("--- %s seconds ---" % (time.time() - start_time))
#Jobs
# jobs = []
# for i in range(4):
# p = Process(target=multi_thread, args=('/home/full_karyo.fa', '/home/haak15', 'dataPP.map'))
# jobs.append(p)
# p.start()
# p.join()
#Pool
pool = Pool(4)
with open('file.txt', 'w') as map_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(multi_thread, map_file, 4)
print(results)
print("--- %s seconds ---" % (time.time() - start_time))
if __name__ == "__main__":
main()
编辑2: 按照roberte和bigc的建议,我重新编写了代码,现在它的速度提高了13倍,而且不再那么令人困惑。正如bigc所指出的,我使用的字典方法不像前一个那样需要I/O。我对速度很满意,所以我暂时不考虑多处理。谢谢你的评论!在
if makemap == True:
## Dictionary method - 13X faster
for i in os.listdir(cwd):
if ".ped" in i:
sample_id, sep, rest = i.partition('.ped')
for i in os.listdir(cwd):
if sample_id+'.pileupgatk' in i and dataset in i:
print("\n\t> Creating MAP file from sample: "+sample_id)
pileup4map = open(i,'r')
snpcounter = sum(1 for _ in pileup4map)-1
pileup4map.seek(0)
counter = 1
piledic = {}
for line in pileup4map:
if counter <= snpcounter:
line = line.rstrip()
#chr21 43805965 G G G
chro, coord, refb, rbase, qual = line.split(' ')
chrom = chro.strip("chr")
piledic[chrom,coord]=int(counter)
counter += 1
pileup4map.close()
mapFileData = open(datasetFolder+'/'+mapFileDataset,'r')
mapDic = {}
counterM =1
for ligna in mapFileData:
#22 Affx-19821577 0.737773 50950707 A G
chroMap,ident,prob,posMap,bas1,bas2 = ligna.split()
mapDic[chroMap,posMap]=int(counterM)
counterM +=1
listOfmatches = []
for item in piledic:
if item in mapDic:
listOfmatches.append(mapDic[item])
listOfmatches.sort()
mapWrite = open(sample_id+".map", 'w')
mapFileData.seek(0)
lineCounter = 1
for lignagain in mapFileData:
if lineCounter in listOfmatches:
mapWrite.write(lignagain)
lineCounter +=1
mapWrite.close()
mapFileData.close()
目前没有回答
相关问题 更多 >
编程相关推荐