Python中多处理池中作业的动态重排序

2024-09-27 19:19:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在编写一个python脚本(适用于cygwin和linux环境),在使用命令行运行的程序上运行回归测试子流程.Popen(). 基本上,我有一组作业,其中的一个子集需要根据开发人员的需要运行(大约10到1000个)。每项工作可能需要几秒钟到20分钟才能完成。在

我的作业在多个处理器上成功运行,但我试图通过智能地(基于过去的性能)命令作业先运行较长的作业来节省一些时间。复杂的是,一些工作(稳态计算)需要先于其他工作(基于稳态确定的初始条件的瞬态)运行。在

我当前处理此问题的方法是在同一进程上递归运行父作业和所有子作业,但有些作业有多个长期运行的子作业。一旦父作业完成,我希望将子作业添加回池中以将其分发给其他进程,但需要将它们添加到队列的头部。我不确定我能不能用它多处理.池. 我找了经理的例子,但他们似乎都是基于人脉关系,并不是特别适用。任何形式的代码或链接到一个好的多处理教程(我在谷歌上搜索…)的帮助将不胜感激。以下是我目前所得到的代码的框架,评论指出了我希望在其他处理器上产生的子作业。在

import multiprocessing
import subprocess

class Job(object):
  def __init__(self, popenArgs, runTime, children)
    self.popenArgs = popenArgs #list to be fed to popen
    self.runTime = runTime #Approximate runTime for the job
    self.children = children #Jobs that require this job to run first

def runJob(job):
  subprocess.Popen(job.popenArgs).wait()
  ####################################################
  #I want to remove this, and instead kick these back to the pool
  for j in job.children: 
    runJob(j)
  ####################################################

def main(jobs):
  # This jobs argument contains only jobs which are ready to be run
  # ie no children, only parent-less jobs
  jobs.sort(key=lambda job: job.runTime, reverse=True)
  multiprocessing.Pool(4).map(runJob, jobs)

Tags: to代码self进程def作业jobsjob
1条回答
网友
1楼 · 发布于 2024-09-27 19:19:18

首先,让我重复一下arminrigo的评论:这里没有理由使用多个进程而不是多个线程。在控制过程中,您大部分时间都在等待子进程完成;您没有CPU密集型的工作要并行化。在

使用线程也会使您更容易解决主要问题。现在,您将作业存储在其他作业的属性中,即隐式依赖关系图。您需要一个单独的数据结构,根据调度对作业进行排序。此外,每个作业树当前都绑定到一个工作进程。您希望将工作人员与用于保存作业的数据结构分离。然后每个工人从同一个任务队列中提取作业;在一个工人完成其作业后,它将作业的子作业排队,然后由任何可用的工人处理。在

由于您希望在父作业完成时将子作业插入到行的前面,因此一个类似堆栈的容器似乎适合您的需要;Queue模块提供了一个线程安全的{}类,您可以使用它。在

import threading
import subprocess
from Queue import LifoQueue

class Job(object):
  def __init__(self, popenArgs, runTime, children):
    self.popenArgs = popenArgs
    self.runTime = runTime
    self.children = children

def run_jobs(queue):
  while True:
    job = queue.get()
    subprocess.Popen(job.popenArgs).wait()
    for child in job.children: 
      queue.put(child)
    queue.task_done()

# Parameter 'jobs' contains the jobs that have no parent.
def main(jobs):
  job_queue = LifoQueue()
  num_workers = 4
  jobs.sort(key=lambda job: job.runTime)
  for job in jobs:
    job_queue.put(job)
  for i in range(num_workers):
    t = threading.Thread(target=run_jobs, args=(job_queue,))
    t.daemon = True
    t.start()
  job_queue.join()

注意以下几点:(1)通过监视工作线程,我们无法知道何时所有的工作都完成了,因为它们不跟踪要完成的工作。这是排队的任务。因此,主线程监视队列对象以了解所有工作何时完成(job_queue.join())。因此,我们可以将工作线程标记为守护进程线程,因此只要主线程执行此操作,进程就会退出,而无需等待工作线程。因此,我们避免了在主线程和工作线程之间进行通信的必要性,以便告诉工作线程什么时候退出循环并停止。在

(2)当所有已排队的任务都被标记为完成时(特别是,当task_done()被调用的次数等于已排队的项目数)时,我们知道所有的工作都已完成。使用队列为空作为所有工作都已完成的条件是不可靠的;在从队列中弹出一个作业和将该作业的孩子排队之间,队列可能会暂时处于一个令人误解的空状态。在

相关问题 更多 >

    热门问题