在多核上运行程序

2024-10-01 04:54:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在用Python运行一个程序,使用线程来并行执行任务。这个任务是简单的字符串匹配,我要将大量的短字符串匹配到数据库中的长字符串。当我试图将它并行化时,我决定将短字符串列表拆分为若干个子列表,这些子列表与核心数量相等,并分别在不同的核心上运行它们。但是,当我在5个或10个内核上运行任务时,它比仅在一个内核上运行要慢两倍。原因是什么?我怎么可能解决它?

编辑:我的代码可以在下面看到

import sys
import os
import csv
import re
import threading
from Queue import Queue
from time import sleep
from threading import Lock


q_in = Queue()
q_out = Queue()
lock = Lock()

def ceil(nu):
    if int(nu) == nu:
        return int(nu)
    else:
        return int(nu) + 1

def opencsv(csvv):
    with open(csvv) as csvfile:
        peptides = []
        reader = csv.DictReader(csvfile)
        k = 0
        lon = ""
        for row in reader:
            pept = str(row["Peptide"])
            pept = re.sub("\((\+\d+\.\d+)\)", "", pept)
            peptides.append(pept)
        return peptides

def openfasta(fast):
    with open(fast, "r") as fastafile:
        dic = {}
        for line in fastafile:
            l = line.strip()
            if l[0] == ">":
                cur = l
                dic[l] = ""
            else:
                dic[cur] = dic[cur] + l
        return dic

def match(text, pattern):
    text = list(text.upper())
    pattern = list(pattern.upper())
    ans = []
    cur = 0
    mis = 0
    i = 0
    while True:
        if i == len(text):
            break
        if text[i] != pattern[cur]:
            mis += 1
            if mis > 1:
                mis = 0
                cur = 0
                continue
        cur = cur + 1
        i = i + 1
        if cur == len(pattern):
            ans.append(i - len(pattern))
            cur = 0
            mis = 0
            continue
    return ans

def job(pepts, outfile, genes):
    c = 0
    it = 0
    towrite = []
    for i in pepts:
        # if it % 1000 == 0:
            # with lock:
                # print float(it) / float(len(pepts))
        it = it + 1
        found = 0
        for j in genes:
            m = match(genes[j], i)
            if len(m) > 0:
                found = 1
                remb = m[0]
                wh = j
                c = c + len(m)
                if c > 1:
                    found = 0
                    c = 0
                    break
        if found == 1:
            towrite.append("\t".join([i, str(remb), str(wh)]) + "\n")
    return towrite


def worker(outfile, genes):
    s = q_in.qsize()
    while True:
        item = q_in.get()
        print "\r{0:.2f}%".format(1 - float(q_in.qsize()) / float(s))
        if item is None:
            break #kill thread
        pepts = item
        q_out.put(job(pepts, outfile, genes))
        q_in.task_done()

def main(args):
    num_worker_threads = int(args[4])

    pept = opencsv(args[1])
    l = len(pept)
    howman = num_worker_threads
    ll = ceil(float(l) / float(howman * 100))
    remain = pept
    pepties = []
    while len(remain) > 0:
        pepties.append(remain[0:ll])
        remain = remain[ll:]
    for i in pepties:
        print len(i)
    print l

    print "Csv file loaded..."
    genes = openfasta(args[2])
    out = args[3]
    print "Fasta file loaded..."

    threads = []

    with open(out, "w") as outfile:
        for pepts in pepties:
            q_in.put(pepts)

        for i in range(num_worker_threads):
            t = threading.Thread(target=worker, args=(outfile, genes, ))
            # t.daemon = True
            t.start()
            threads.append(t)

        q_in.join() # run workers

        # stop workers
        for _ in range(num_worker_threads):
            q_in.put(None)
        for t in threads:
            t.join()
            # print(t)

    return 0
if __name__ == "__main__":
  sys.exit(main(sys.argv))

在长序列中,长序列中的基因与短序列匹配是很重要的。


Tags: inimportforlenreturnifdeffloat
1条回答
网友
1楼 · 发布于 2024-10-01 04:54:31

这应该是因为CPython中的GIL(全局解释器锁)。在

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once.

davidbeazley在pycon2010上的presentation对GIL做了详细的解释。从第32页到第34页,他解释了为什么相同的多线程代码(CPU绑定的计算)在多核运行时的性能比单核运行时的性能差。在

(with single core) Threads alternate execution, but switch far less frequently than you might imagine

With multiple cores, runnable threads get scheduled simultaneously (on different cores) and battle over the GIL

David的this experiment result可视化了“线程切换如何随着cpu数量的增加而变得更快”。在

即使您的job函数包含一些I/O,根据它的3级嵌套循环(两个在job中,一个在match中),它更像是CPU限制的计算。在

将代码更改为多个处理将帮助您利用多个核心,并可能提高性能。然而,您可以获得多少取决于计算的数量-并行计算的好处是否能够远远超过多个处理(如进程间通信)所带来的开销。在

相关问题 更多 >