使用Concurrent.Futures.ProcessPoolExecutor运行同步和独立ABAQUS模型

2024-09-29 21:41:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望运行总共nananalysis=25Abaqus模型,每个模型都使用X个核数,我可以同时运行这些模型的nParallelLoops=5。如果当前5个分析中的一个完成,则应开始另一个分析,直到所有分析都完成。在

我根据12中发布的解决方案实现了下面的代码。但是,我遗漏了一些东西,因为所有的nananalysis都试图从“一次”开始,代码死锁,而且没有完成任何分析,因为许多分析可能希望使用与已经开始的分析使用的相同的核心。在

  1. Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
  2. How to parallelize this nested loop in Python that calls Abaqus
def runABQfile(*args):    
    import subprocess
    import os

    inpFile,path,jobVars = args

    prcStr1 = (path+'/runJob.sh')

    process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)

def safeABQrun(*args):
    import os

    try:
        runABQfile(*args)
    except Exception as e:
        print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait

    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis))  # 5Nodes
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

到目前为止,我能够运行的唯一方法是修改errFunction以便在下面的时间使用5个分析。然而,这种方法有时会导致在每个组(每个ProcessPoolExecutor调用)中其中一个分析花费的时间比其他4个要长得多,因此,尽管资源(核心)可用,下一个5人组将无法启动。最终,这将有更多的时间来完成所有25个模型。在

^{pr2}$

我尝试使用as_completed函数,但似乎也不起作用。在

请你帮我弄清楚正确的并行化,这样我就可以运行一个nananalysis,同时总是并行运行nParallelLoops? 谢谢你的帮助。 我使用的是python2.7

最好的, 大卫P


更新日期:2016年7月30日

我在safeABQrun中引入了一个循环,它管理5个不同的“队列”。循环是必要的,以避免分析试图在节点中运行而另一个节点仍在运行。分析被预先配置为在开始任何实际分析之前在请求的节点之一中运行。在

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))

Tags: toin模型importnoneosdefas
2条回答

我在safeABQrun中引入了一个循环,它管理5个不同的“队列”。循环是必要的,以避免分析试图在节点中运行而另一个节点仍在运行。分析被预先配置为在开始任何实际分析之前在请求的节点之一中运行。在

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))

我觉得没问题,但我不能按原样运行你的代码。试一下简单得多的东西,然后再添加一些东西,直到“问题”出现,怎么样?例如,以下是否显示了您想要的行为类型?它在我的机器上运行,但我运行的是python3.5.2。你说你运行的是2.7,但是Python2中没有concurrent.futures所以如果你使用的是2.7,你必须运行某个库的后台端口,也许问题就在这一点上。尝试以下方法有助于回答是否是这样:

from concurrent.futures import ProcessPoolExecutor, wait, as_completed

def worker(i):
    from time import sleep
    from random import randrange
    s = randrange(1, 10)
    print("%d started and sleeping for %d" % (i, s))
    sleep(s)

if __name__ == "__main__":
    nAnalysis = 25
    nParallelLoops = 5
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis))
        for f in as_completed(futures):
            print("got %d" % futures[f])

典型输出:

^{pr2}$

相关问题 更多 >

    热门问题