多处理.池:如何在旧流程结束时启动新流程?

2024-10-01 00:18:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用多处理池来管理细分过程(缩微胶片的OCRing页面)。通常情况下,在一个大约20个tesseract进程的池中,几个页面将更难OCR,因此这些进程比其他进程花费的时间要长得多。同时,这个池只是挂着,大部分cpu没有被利用。我想让这些散乱的人继续工作,但我也希望启动更多的进程来填满其他许多CPU,这些CPU现在闲置着,而这几个粘页正在完成。我的问题是:有没有办法加载新的进程来利用那些空闲的cpu。换言之,在等待整个游泳池完成之前,游泳池中的空位能否填满?在

我可以使用starmap的异步版本,然后在当前池下降到一定数量的活动进程时加载一个新的池。但这似乎不雅观。在需要的时候自动保持开槽会更优雅。在

我的代码现在是这样的:

def getMpBatchMap(fileList, commandTemplate, concurrentProcesses):
    mpBatchMap = []
    for i in range(concurrentProcesses):
        fileName = fileList.readline()
        if fileName:
            mpBatchMap.append((fileName, commandTemplate))
    return mpBatchMap

def executeSystemProcesses(objFileName, commandTemplate):
    objFileName = objFileName.strip()
    logging.debug(objFileName)
    objDirName = os.path.dirname(objFileName)
    command = commandTemplate.substitute(objFileName=objFileName, objDirName=objDirName)
    logging.debug(command)
    subprocess.call(command, shell=True)

def process(FILE_LIST_FILENAME, commandTemplateString, concurrentProcesses=3):
    """Go through the list of files and run the provided command against them,
    one at a time. Template string maps the terms $objFileName and $objDirName.

    Example:
    >>> runBatchProcess('convert -scale 256 "$objFileName" "$objDirName/TN.jpg"')
    """
    commandTemplate = Template(commandTemplateString)
    with open(FILE_LIST_FILENAME) as fileList:
        while 1:
            # Get a batch of x files to process
            mpBatchMap = getMpBatchMap(fileList, commandTemplate, concurrentProcesses)
            # Process them
            logging.debug('Starting MP batch of %i' % len(mpBatchMap))
            if mpBatchMap:
                with Pool(concurrentProcesses) as p:
                    poolResult = p.starmap(executeSystemProcesses, mpBatchMap)
                    logging.debug('Pool result: %s' % str(poolResult))
            else:
                break

Tags: ofthedebug进程loggingdef页面filename
1条回答
网友
1楼 · 发布于 2024-10-01 00:18:08

你在这里混东西。池始终保持一定数量的指定进程处于活动状态。只要您不关闭池,不管是手动关闭还是通过离开上下文管理器的with块,就不需要用进程重新填充池,因为它们不会去任何地方。在

您可能想说的是“任务”,即这些流程可以处理的任务。任务是传递给池方法的iterable的每个进程块。是的,有一种方法可以在池中对新任务使用空闲进程,然后再处理所有以前排队的任务。您已经为此选择了正确的工具,即池方法的异步版本。你要做的就是重新应用某种异步池方法。在

from multiprocessing import Pool
import os

def busy_foo(x):
    x = int(x)
    for _ in range(x):
        x - 1
    print(os.getpid(), ' returning: ', x)
    return x

if __name__ == '__main__':

    arguments1 = zip([222e6, 22e6] * 2)
    arguments2 = zip([111e6, 11e6] * 2)

    with Pool(4) as pool:

        results = pool.starmap_async(busy_foo, arguments1)
        results2 = pool.starmap_async(busy_foo, arguments2)

        print(results.get())
        print(results2.get())

输出示例:

^{pr2}$

上面要注意的是,进程3182和3185以更简单的任务结束,立即从第二个参数列表中的任务开始,而不必等待3181和3184首先完成。在

如果出于某种原因,您确实希望在每个进程处理一定数量的任务之后使用新的进程,那么maxtasksperchild参数是Pool。在那里,您可以指定池应该在多少个任务之后用新进程替换旧进程。此参数的默认值是None,因此池在默认情况下不替换进程。在

相关问题 更多 >