我正在用Python运行一个程序,使用线程来并行执行任务。这个任务是简单的字符串匹配,我要将大量的短字符串匹配到数据库中的长字符串。当我试图将它并行化时,我决定将短字符串列表拆分为若干个子列表,这些子列表与核心数量相等,并分别在不同的核心上运行它们。但是,当我在5个或10个内核上运行任务时,它比仅在一个内核上运行要慢两倍。原因是什么?我怎么可能解决它?
编辑:我的代码可以在下面看到
import sys
import os
import csv
import re
import threading
from Queue import Queue
from time import sleep
from threading import Lock
q_in = Queue()
q_out = Queue()
lock = Lock()
def ceil(nu):
if int(nu) == nu:
return int(nu)
else:
return int(nu) + 1
def opencsv(csvv):
with open(csvv) as csvfile:
peptides = []
reader = csv.DictReader(csvfile)
k = 0
lon = ""
for row in reader:
pept = str(row["Peptide"])
pept = re.sub("\((\+\d+\.\d+)\)", "", pept)
peptides.append(pept)
return peptides
def openfasta(fast):
with open(fast, "r") as fastafile:
dic = {}
for line in fastafile:
l = line.strip()
if l[0] == ">":
cur = l
dic[l] = ""
else:
dic[cur] = dic[cur] + l
return dic
def match(text, pattern):
text = list(text.upper())
pattern = list(pattern.upper())
ans = []
cur = 0
mis = 0
i = 0
while True:
if i == len(text):
break
if text[i] != pattern[cur]:
mis += 1
if mis > 1:
mis = 0
cur = 0
continue
cur = cur + 1
i = i + 1
if cur == len(pattern):
ans.append(i - len(pattern))
cur = 0
mis = 0
continue
return ans
def job(pepts, outfile, genes):
c = 0
it = 0
towrite = []
for i in pepts:
# if it % 1000 == 0:
# with lock:
# print float(it) / float(len(pepts))
it = it + 1
found = 0
for j in genes:
m = match(genes[j], i)
if len(m) > 0:
found = 1
remb = m[0]
wh = j
c = c + len(m)
if c > 1:
found = 0
c = 0
break
if found == 1:
towrite.append("\t".join([i, str(remb), str(wh)]) + "\n")
return towrite
def worker(outfile, genes):
s = q_in.qsize()
while True:
item = q_in.get()
print "\r{0:.2f}%".format(1 - float(q_in.qsize()) / float(s))
if item is None:
break #kill thread
pepts = item
q_out.put(job(pepts, outfile, genes))
q_in.task_done()
def main(args):
num_worker_threads = int(args[4])
pept = opencsv(args[1])
l = len(pept)
howman = num_worker_threads
ll = ceil(float(l) / float(howman * 100))
remain = pept
pepties = []
while len(remain) > 0:
pepties.append(remain[0:ll])
remain = remain[ll:]
for i in pepties:
print len(i)
print l
print "Csv file loaded..."
genes = openfasta(args[2])
out = args[3]
print "Fasta file loaded..."
threads = []
with open(out, "w") as outfile:
for pepts in pepties:
q_in.put(pepts)
for i in range(num_worker_threads):
t = threading.Thread(target=worker, args=(outfile, genes, ))
# t.daemon = True
t.start()
threads.append(t)
q_in.join() # run workers
# stop workers
for _ in range(num_worker_threads):
q_in.put(None)
for t in threads:
t.join()
# print(t)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))
在长序列中,长序列中的基因与短序列匹配是很重要的。
这应该是因为CPython中的GIL(全局解释器锁)。在
davidbeazley在pycon2010上的presentation对GIL做了详细的解释。从第32页到第34页,他解释了为什么相同的多线程代码(CPU绑定的计算)在多核运行时的性能比单核运行时的性能差。在
David的this experiment result可视化了“线程切换如何随着cpu数量的增加而变得更快”。在
即使您的
job
函数包含一些I/O,根据它的3级嵌套循环(两个在job
中,一个在match
中),它更像是CPU限制的计算。在将代码更改为多个处理将帮助您利用多个核心,并可能提高性能。然而,您可以获得多少取决于计算的数量-并行计算的好处是否能够远远超过多个处理(如进程间通信)所带来的开销。在
相关问题 更多 >
编程相关推荐