处理来自asyncmap的结果

2024-06-01 09:24:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试使用ipython的并行处理来并行处理数据。我按照@minrk的指示回答关于how to get intermidiate results in ipython parallel processing?的问题。当其他任务的数据处理完成后,我会尽快保存它们。我的做法如下:

from IPython.parallel import Client

def specialfunc(param):
    import time
    if param > 8:
        raise IOError
    else:
        time.sleep( param)
        return param

client = Client()
balanced       = client.load_balanced_view()
balanced.block = False
param_list = range(10)   # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)

然后,我可以在asyncmap上循环,结果准备好后就可以使用了:

^{pr2}$

问题是我的代码有时会抛出异常(上面的示例在调用参数超过8时强制执行IOError),这是我想要处理的。然而,一旦其中一个引擎发出抖动,整个异步映射“似乎”就完成了。在

审问的时候我注意到了asyncmap.metadata可以很好地找出哪个消息出错了(asyncmap.metadata[i] ['pyerr']),但是我不知道如何等待结果的到来。在

所以我的问题是,我应该如何处理从我的引擎异步到达的结果,即使它们有时会抛出异常。如何在引擎中捕捉异常而不影响控制器中的等待结果?在


Tags: 引擎importclientfalsetimeparamparallelipython
2条回答

ipython/*/examples/parallel/customresults.py的启发,我想出了这个解决方案:

asyncmap = balanced.map(specialfunc, param_list, ordered=False)

#create original mapping of msg_ids to parameters
# maybe just a quick way to find which parameter gave what result
msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list))

pending = set(asyncmap.msg_ids) # all queued jobs are pending
while pending:   # we'll come back as long as finished jobs haven't been looked at yet
    try:
        client.wait(pending, 1e-3)
    except parallel.TimeoutError:
        # ignore timeouterrors, since they only mean that at least one isn't done
        pass

    # finished is the set of msg_ids that are complete
    finished = pending.difference(client.outstanding)
    # update pending to exclude those that just finished
    pending = pending.difference(finished)
    for msg_id in finished:
        # we know these are done, so don't worry about blocking
        ar = client.get_result(msg_id)
        # checking whether any exceptions occurred when code ran on the engine
        if ar.metadata['pyerr'] is None:
            print "job id %s finished on engine %i " % (msg_id, ar.engine_id)
            print "and results for parameter %i :" % msg_ids_to_parameters[msg_id]
            # note that each job in a map always returns a list of length chunksize
            # even if chunksize == 1
            for res in ar.result:
                print " item %i \n" % res
        else:
            print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr']))

从本质上讲,示例代码的变化是查看元数据并查看是否记录了错误,并且只有在不继续并通过ar.result检索结果的情况下。在

我知道这听起来有点愚蠢,但您可以返回一个特殊值来指示错误,比如-1或{}或字符串。为了避开map_async,我所做的就是循环遍历参数并使用apply_async,将结果存储在一个列表中。然后,我循环查看列表,试图得到结果,并一次处理一个。看起来像这样:

 n_cores = len(c.ids)
 for n,p in enumerate( params ):
     core = c.ids[n%n_cores]
     calls.append( c[core].apply_async( f, p ) )

  #then you get the results

 while calls != []:
      for c in calls:
          try:
               result = c.get(1e-3)
               process(result)
               calls.remove( c )
               #in the case your call failed, you can apply_async again.
               # and append the call to calls.
          except parallel.TimeoutError:
               pass

或者使用c[core].apply()并用c.ready()检查调用。基本上是一样的,没有异常处理。令人讨厌的是,这会占用大量内存,因为每次调用的results和其他{}很难清除。在

我在做一件类似的事情here,我决定map_async对我来说不起作用。This也可能是相关的,以防您决定采用这种方法。在

干杯。在

PS:我认为基本上这就是您上面实现的,但是我发现单独处理这些调用然后将它们堆叠到映射中更为自然,特别是如果您以后需要重新处理其中一些调用的话。在

相关问题 更多 >