在调用iterable(?)之间,apply_async
流是如何工作的函数和回调函数?在
设置:我正在读取2000文件目录中所有文件的一些行,有些有数百万行,有些只有几行。提取一些头/格式/日期数据以对每个文件进行字符化。这是在一台16 CPU的机器上完成的,所以对它进行多处理是有意义的。在
目前,预期的结果被发送到一个列表(ahlala
),这样我就可以打印出来;稍后,它将被写入*.csv。这是我代码的一个简化版本,最初基于this非常有用的帖子。在
import multiprocessing as mp
def dirwalker(directory):
ahlala = []
# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f):
fileinfo = Z(arr_of_lines)
return fileinfo
# Y() reads other types of files and does the same thing
def Y(f):
fileinfo = Z(arr_of_lines)
return fileinfo
# results() is the callback function
def results(r):
ahlala.extend(r) # or .append, haven't yet decided
# helper function
def Z(arr):
return fileinfo # to X() or Y()!
for _,_,files in os.walk(directory):
pool = mp.Pool(mp.cpu_count()
for f in files:
if (filetype(f) == filetypeX):
pool.apply_async(X, args=(f,), callback=results)
elif (filetype(f) == filetypeY):
pool.apply_async(Y, args=(f,), callback=results)
pool.close(); pool.join()
return ahlala
注意,如果我将所有的帮助函数Z()
放入X()
、Y()
或{pool.apply_async()
…完成进程的所有作业之后吗?如果这些helper函数是在作用域(?)内调用的,不是应该更快吗第一个函数pool.apply_async()
接受的(在本例中,X()
)是什么?如果不是,我应该把helper函数放在results()
中吗?在
其他相关的想法:守护进程为什么什么都没有出现?我也很困惑如何排队,如果这就是问题所在。This seems like a place to start learning it,但是当使用apply_async
时,是否可以安全地忽略队列,或者只在明显的时间效率低下时?在
你问的是一大堆不同的问题,所以我会尽我所能把这些都说出来:
一旦工作进程返回结果,传递给}。在
callback
的函数将在主进程(而不是工作进程)中执行。它在Pool
对象内部创建的线程中执行。该线程使用result_queue
中的对象,该对象用于从所有工作进程获取结果。在线程将结果从队列中取出后,它执行callback
。在执行回调时,无法从队列中提取其他结果,因此回调必须快速完成。在您的示例中,一旦通过apply_async
对X
或Y
的一个调用完成,工作进程将把结果放入result_queue
中,然后结果处理线程将从result_queue
中提取结果,并执行您的{其次,我怀疑您没有看到示例代码发生任何事情的原因是因为您的所有worker函数调用都失败了。如果工作函数失败,} 对象获取结果,否则根本不会报告失败。但是,由于您没有保存这些对象中的任何一个,您永远不会知道发生了故障。如果我是您,我会在您测试时尝试使用
callback
将永远不会执行。除非您尝试从调用apply_async
返回的^{pool.apply
,这样您就可以在错误发生时立即看到它们。在worker失败的原因(至少在您提供的示例代码中)是因为}被定义为另一个函数中的函数。
X
和{multiprocessing
将函数和对象传递给工作进程,方法是在主进程中对它们进行酸洗,然后在工作进程中取消对它们的拾取。在其他函数中定义的函数不可选择,这意味着multiprocessing
将无法在工作进程中成功取消拾取它们。要解决此问题,请在模块的顶层定义这两个函数,而不是嵌入dirwalker
函数。在您应该继续从}调用
X
和{Z
,而不是在results
中。这样,Z
可以在所有工作进程中并发运行,而不必在主进程中一次运行一个调用。记住,你的callback
函数应该尽可能快,这样就不会耽误处理结果。在那里执行Z
会减慢速度。在下面是一些与您所做的类似的简单示例代码,希望能让您了解您的代码应该是什么样子:
输出:
^{pr2}$编辑:
您可以使用
multiprocessing.Manager
类创建在父进程和子进程之间共享的dict对象:然后让
X
和Y
接受第二个名为helper_dict
(或任何你想要的名字)的参数,你就一切就绪了。在需要注意的是,这是通过创建一个包含普通dict的服务器进程来实现的,并且所有其他进程都通过代理对象与该dict进行通信。所以每次你读或写dict,你都在做IPC。这使它比真正的口述慢得多
相关问题 更多 >
编程相关推荐