并行处理通过多处理过程()子类

2024-10-01 02:24:10 发布

您现在位置:Python中文网/ 问答频道 /正文

以下是我的程序布局:

有些进程专门用于从一长串url中提取、美化和编码HTML(因为美化对象不能被pickle)。这个HTML被put()转换成multiprocessing.JoinableQueue()。还有一些,将get()从队列中分离出来,重新整理和解析HTML,并填充一个包含一些数据的本地字典。然后,字典被传递到另一个队列中,在该队列中,一个专用线程将它们作为行写入一个文件。你知道吗

我正在创建这样的流程:

numProcesses = 6
numGetProcesses = 3

# Get HTML
getProcesses = []
for ii in range(numGetProcesses):
    newProcess = GetHTML(urlInputQ, htmlOutputQ)
    getProcesses.append(newProcess)
    newProcess.start()    

# Parse HTML
parseProcesses = []
for ii in range(numProcesses - numGetProcesses)
    newProcess = ParseHTML(htmlOutputQ, dataOutputQ)
    parseProcesses.append(newProcess)
    newProcess.start()

然后,如果GetHTML进程在ParseHTML进程之前完成它们的工作,我[认为我]终止它们,并创建新的ParseHTML进程:

for process in getProcesses: process.join() # Wait for HTML
# Dedicate newly freed CPU to parsing
print('Redistributing wealth...')
for ii in range(numGetProcesses):
    oldProcess = getProcesses.pop()
    oldProcess.terminate()
    newProcess = csdb.ParseHTML(htmlOutputQ, dataOutputQ)
    parseProcesses.append(newProcess)
    newProcess.start()

首先,我希望这将有3个进程用于获取HTML,3个进程用于解析HTML。然后,当HTML获取完成时,应该有6个进程解析HTML。出现了两个问题,我认为可能是我对事情的理解有误:

1)当我将numGetProcesses改为1或5时,我看不到程序的总速度有什么差别(大约有1000个url,大约3分钟就结束了)。你知道吗

2)我还发现在清空HTML get和HTML parse队列所需的时间上没有区别

3)消息“重新分配财富…”总是ParseHTML进程完全清空其队列前几秒钟弹出。也就是说,GetHTML进程(无论是1还是5)在同一时间完成,并且相对于ParseHTML进程在同一时间完成。你知道吗

在我看来,似乎每个函数都有固定数量的进程在运行,而不管我设置了多少个进程或启动了多少个进程。有人有什么想法吗?你知道吗


抱歉,我不能包含完整的工作代码。它太大了。不过,这里是GetHTML类的样子。ParseHTML类的设置与此类似,但它做了大量的工作:

class GetHTML(multiprocessing.Process):
    def __init__(self, inputQ, outputQ):
        multiprocessing.Process.__init__(self)
        self.inputQ = inputQ
        self.outputQ = outputQ
        self.data = df_base.copy()

    def run(self):
        while True:
            for key in self.data.keys(): self.data[key] = None
            info = self.inputQ.get()
            if info == "die": break
            self.data['id'] = info[1]
            self.data['date'] = info[2]
            uncookedHTML = CookSoup(info[0])
            encodedHTML = uncookedHTML.encode('utf-8')
            self.outputQ.put([self.data, encodedHTML])
            self.inputQ.task_done()

菜汤(url)

ua = 'Mozilla/5.0 (X11; Linux x86_64; rv:18.0) Gecko/20100101 Firefox/18.0 (compatible;)' #For User-Agent header
def CookSoup(url):
    request = urllib2.Request(url) # Requests URL
    request.add_header('User-Agent', ua) # Adds User Agent
    response = urllib2.urlopen(request) # Opens URL
    soup = BeautifulSoup(response) # Creates soup
    response.close() #Closes file
    return soup #Returns the HTML

Tags: inselfinfourlfordata队列进程