我尝试使用ipython的并行处理来并行处理数据。我按照@minrk的指示回答关于how to get intermidiate results in ipython parallel processing?的问题。当其他任务的数据处理完成后,我会尽快保存它们。我的做法如下:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
然后,我可以在asyncmap上循环,结果准备好后就可以使用了:
^{pr2}$问题是我的代码有时会抛出异常(上面的示例在调用参数超过8时强制执行IOError),这是我想要处理的。然而,一旦其中一个引擎发出抖动,整个异步映射“似乎”就完成了。在
审问的时候我注意到了asyncmap.metadata可以很好地找出哪个消息出错了(asyncmap.metadata[i] ['pyerr']),但是我不知道如何等待结果的到来。在
所以我的问题是,我应该如何处理从我的引擎异步到达的结果,即使它们有时会抛出异常。如何在引擎中捕捉异常而不影响控制器中的等待结果?在
受ipython/*/examples/parallel/customresults.py的启发,我想出了这个解决方案:
从本质上讲,示例代码的变化是查看元数据并查看是否记录了错误,并且只有在不继续并通过
ar.result
检索结果的情况下。在我知道这听起来有点愚蠢,但您可以返回一个特殊值来指示错误,比如}或字符串。为了避开
-1
或{map_async
,我所做的就是循环遍历参数并使用apply_async
,将结果存储在一个列表中。然后,我循环查看列表,试图得到结果,并一次处理一个。看起来像这样:或者使用}很难清除。在
c[core].apply()
并用c.ready()
检查调用。基本上是一样的,没有异常处理。令人讨厌的是,这会占用大量内存,因为每次调用的results
和其他{我在做一件类似的事情here,我决定map_async对我来说不起作用。This也可能是相关的,以防您决定采用这种方法。在
干杯。在
PS:我认为基本上这就是您上面实现的,但是我发现单独处理这些调用然后将它们堆叠到映射中更为自然,特别是如果您以后需要重新处理其中一些调用的话。在
相关问题 更多 >
编程相关推荐