多处理.池:在使用apply_async的回调选项时调用helper函数

2024-10-01 07:51:19 发布

您现在位置:Python中文网/ 问答频道 /正文

在调用iterable(?)之间,apply_async流是如何工作的函数和回调函数?在

设置:我正在读取2000文件目录中所有文件的一些行,有些有数百万行,有些只有几行。提取一些头/格式/日期数据以对每个文件进行字符化。这是在一台16 CPU的机器上完成的,所以对它进行多处理是有意义的。在

目前,预期的结果被发送到一个列表(ahlala),这样我就可以打印出来;稍后,它将被写入*.csv。这是我代码的一个简化版本,最初基于this非常有用的帖子。在

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala

注意,如果我将所有的帮助函数Z()放入X()Y()或{},那么代码就可以工作了,但是这是重复的还是可能慢于可能的速度?我知道每次函数调用都会调用回调函数,但回调函数何时调用?是在pool.apply_async()…完成进程的所有作业之后吗?如果这些helper函数是在作用域(?)内调用的,不是应该更快吗第一个函数pool.apply_async()接受的(在本例中,X())是什么?如果不是,我应该把helper函数放在results()中吗?在

其他相关的想法:守护进程为什么什么都没有出现?我也很困惑如何排队,如果这就是问题所在。This seems like a place to start learning it,但是当使用apply_async时,是否可以安全地忽略队列,或者只在明显的时间效率低下时?在


Tags: to函数helperasyncreturndefcallbackfunction
1条回答
网友
1楼 · 发布于 2024-10-01 07:51:19

你问的是一大堆不同的问题,所以我会尽我所能把这些都说出来:

一旦工作进程返回结果,传递给callback的函数将在主进程(而不是工作进程)中执行。它在Pool对象内部创建的线程中执行。该线程使用result_queue中的对象,该对象用于从所有工作进程获取结果。在线程将结果从队列中取出后,它执行callback。在执行回调时,无法从队列中提取其他结果,因此回调必须快速完成。在您的示例中,一旦通过apply_asyncXY的一个调用完成,工作进程将把结果放入result_queue中,然后结果处理线程将从result_queue中提取结果,并执行您的{}。在

其次,我怀疑您没有看到示例代码发生任何事情的原因是因为您的所有worker函数调用都失败了。如果工作函数失败,callback将永远不会执行。除非您尝试从调用apply_async返回的^{}对象获取结果,否则根本不会报告失败。但是,由于您没有保存这些对象中的任何一个,您永远不会知道发生了故障。如果我是您,我会在您测试时尝试使用pool.apply,这样您就可以在错误发生时立即看到它们。在

worker失败的原因(至少在您提供的示例代码中)是因为X和{}被定义为另一个函数中的函数。multiprocessing将函数和对象传递给工作进程,方法是在主进程中对它们进行酸洗,然后在工作进程中取消对它们的拾取。在其他函数中定义的函数不可选择,这意味着multiprocessing将无法在工作进程中成功取消拾取它们。要解决此问题,请在模块的顶层定义这两个函数,而不是嵌入dirwalker函数。在

您应该继续从X和{}调用Z,而不是在results中。这样,Z可以在所有工作进程中并发运行,而不必在主进程中一次运行一个调用。记住,你的callback函数应该尽可能快,这样就不会耽误处理结果。在那里执行Z会减慢速度。在

下面是一些与您所做的类似的简单示例代码,希望能让您了解您的代码应该是什么样子:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

输出:

^{pr2}$

编辑:

您可以使用multiprocessing.Manager类创建在父进程和子进程之间共享的dict对象:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

然后让XY接受第二个名为helper_dict(或任何你想要的名字)的参数,你就一切就绪了。在

需要注意的是,这是通过创建一个包含普通dict的服务器进程来实现的,并且所有其他进程都通过代理对象与该dict进行通信。所以每次你读或写dict,你都在做IPC。这使它比真正的口述慢得多

相关问题 更多 >