回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我希望运行总共<strong><em>nananalysis=25</em></strong>Abaqus模型,每个模型都使用X个核数,我可以同时运行这些模型的<strong><em>nParallelLoops=5</em></strong>。如果当前5个分析中的一个完成,则应开始另一个分析,直到所有<strong><em>分析</em></strong>都完成。在</p>
<p>我根据<strong>1</strong>和<strong>2</strong>中发布的解决方案实现了下面的代码。但是,我遗漏了一些东西,因为所有的<strong><em>nananalysis</em></strong>都试图从“一次”开始,代码死锁,而且没有完成任何分析,因为许多分析可能希望使用与已经开始的分析使用的相同的核心。在</p>
<ol>
<li><a href="https://stackoverflow.com/questions/9874042/using-pythons-multiprocessing-module-to-execute-simultaneous-and-separate-seawa">Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs</a></li>
<li><a href="https://stackoverflow.com/questions/37169336/how-to-parallelize-this-nested-loop-in-python-that-calls-abaqus">How to parallelize this nested loop in Python that calls Abaqus</a></li>
</ol>
<pre class="lang-py prettyprint-override"><code>def runABQfile(*args):
import subprocess
import os
inpFile,path,jobVars = args
prcStr1 = (path+'/runJob.sh')
process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)
def safeABQrun(*args):
import os
try:
runABQfile(*args)
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
</code></pre>
<p>到目前为止,我能够运行的唯一方法是修改<code>errFunction</code>以便在下面的时间使用5个分析。然而,这种方法有时会导致在每个组(每个<code>ProcessPoolExecutor</code>调用)中其中一个分析花费的时间比其他4个要长得多,因此,尽管资源(核心)可用,下一个5人组将无法启动。最终,这将有更多的时间来完成所有25个模型。在</p>
^{pr2}$
<p>我尝试使用<code>as_completed</code>函数,但似乎也不起作用。在</p>
<p>请你帮我弄清楚正确的并行化,这样我就可以运行一个<strong><em>nananalysis</em></strong>,同时总是并行运行<strong><em>nParallelLoops</em></strong>?
谢谢你的帮助。
我使用的是python2.7</p>
<p>最好的,
大卫P</p>
<hr/>
<p><strong>更新日期:2016年7月30日</strong>:</p>
<p>我在<code>safeABQrun</code>中引入了一个循环,它管理5个不同的“队列”。循环是必要的,以避免分析试图在节点中运行而另一个节点仍在运行。分析被预先配置为在开始任何实际分析之前在请求的节点之一中运行。在</p>
<pre class="lang-py prettyprint-override"><code>def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))
</code></pre>