如何在Python中使用循环追踪多个并行子进程

2024-10-02 04:32:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经编写了一个Python脚本,使用GDAL开源库和该库提供的命令行实用程序来平铺图像。首先,我读取一个输入数据集,它告诉我每个分片范围。然后,我遍历这些tiles并启动一个子进程来调用gdalwalp,以便将输入图像剪辑到循环中的当前tile。在

我不想用波彭,等等(),因为这将阻止同时处理磁贴,但我确实希望跟踪子进程返回的任何消息。此外,一旦创建了一个特定的tile,我需要使用gdalinfo计算新文件的统计信息,这需要另一个子进程。在

代码如下:

processing = {}
for tile in tileNums:
    subp = subprocess.Popen(['gdalwarp', '-ot', 'Int16', '-r', 'cubic', '-of', 'HFA', '-cutline', tileIndexShp, '-cl', os.path.splitext(os.path.basename(tileIndexShp))[0], '-cwhere', "%s = '%s'" % (tileNumField, tile), '-crop_to_cutline', os.path.join(inputTileDir, 'mosaic_Proj.vrt'), os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_', tile))], stdout=subprocess.PIPE)
    processing[tile] = [subp]

while processing:
    for tile, subps in processing.items():
        for idx, subp in enumerate(subps):
            if subp == None: continue
            poll = subp.poll()
            if poll == None: continue
            elif poll != 0:
                subps[idx] = None
                print tile, "%s Unsuccessful" % ("Retile" if idx == 0 else "Statistics")
            else:
                subps[idx] = None
                print tile, "%s Succeeded" % ("Retile" if idx == 0 else "Statistics")
                if subps == [None, None]:
                    del processing[tile]
                    continue
                subps.append(subprocess.Popen(['gdalinfo', '-stats', os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_',tile))], stdout=subprocess.PIPE))

在大多数情况下,这对我来说是可行的,但我看到的一个问题是,当它到达最后一个平铺时,它似乎创建了一个无限循环。我知道这不是最好的方法,但是我对subprocess模块非常陌生,我基本上就是把它放在一起,试图让它工作起来。在

有谁能推荐一种更好的方法来遍历tile列表,为每个可以并发处理的tile生成一个子进程,并在第一个子进程为每个tile完成时生成第二个子进程?在

更新: 谢谢你的建议。我试图重构上面的代码以利用多处理模块和池。在

新代码如下:

^{pr2}$

这给我带来了一些大麻烦。首先,我最终创建了许多新流程。大约一半是python.exe另一半是另一个gdal实用程序,我在上面的代码之前调用它来拼接输入的图像,如果它已经在另一个平铺方案中平铺(gdalbuildvrt.exe). 在所有python.exe以及gdalbuildvrt.exe正在创建的进程中,大约25%的CPU(Intel I7在超线程时有8个内核)和99%的16gb内存正在使用,计算机完全挂起。我甚至不能在任务管理器中或者通过taskkill命令行来终止进程。在

我错过了什么?在


Tags: path代码noneif进程osexesubprocess
2条回答

使用python multiprocessing模块来创建流程的Pool,而不是生成和管理单独的子流程。在

我还没有测试过,但它应该可以工作:

import Queue

from threading import Thread

class Consumer(Thread):
    def __init__(self, queue=None):
        super(Consumer, self).__init__()

        self.daemon = True
        self.queue = queue


    def run(self):
        while True:
            task = self.queue.get()

            # Spawn your process and .wait() for it to finish.

            self.queue.task_done()

if __name__ == '__main__':
     queue = Queue.Queue()

     for task in get_tasks():
         queue.put(task)

     # You spawn 20 worker threads to process your queue nonstop
     for i in range(20):
         consumer = Consumer(queue)
         consumer.start()

     queue.join()

基本上,您有一个队列,其中充满了需要完成的任务。然后,您只需生成20个工作线程来不断地从队列中提取新任务并同时处理它们。在

相关问题 更多 >

    热门问题