import multiprocessing as mp
import time
def worker(value):
# The generator is defined inside the multiprocessed function
def gen():
for k in range(value):
time.sleep(1) # Simulate long running task
yield k
# Execute the generator
for x in gen():
print(x)
# Do something with x?
pass
pool = mp.Pool()
pool.map(worker, [2, 5, 2])
pool.join() # Wait for all the work to be finished.
pool.close() # Clean up system resources
Let’s ignore that problem for a moment and look what we would need to do to pickle a generator. Since a generator is essentially a souped-up function, we would need to save its bytecode, which is not guarantee to be backward-compatible between Python’s versions, and its frame, which holds the state of the generator such as local variables, closures and the instruction pointer. And this latter is rather cumbersome to accomplish, since it basically requires to make the whole interpreter picklable. So, any support for pickling generators would require a large number of changes to CPython’s core.
Now if an object unsupported by pickle (e.g., a file handle, a socket, a database connection, etc) occurs in the local variables of a generator, then that generator could not be pickled automatically, regardless of any pickle support for generators we might implement. So in that case, you would still need to provide custom getstate and setstate methods. This problem renders any pickling support for generators rather limited.
他还提出了一个解决方案,使用简单的迭代器
the best solution to this problem to the rewrite the generators as simple iterators (i.e., one with a __next__ method). Iterators are easy and efficient space-wise to pickle because their state is explicit. You would still need to handle objects representing some external state explicitly however; you cannot get around this.
M = len(gens)
N = multiprocessing.cpu_count()
def proc(gen_idx):
return [r for r in gens[gen_idx]()]
if __name__ == "__main__":
with multiprocessing.Pool(N) as p:
for r in p.imap_unordered(proc, range(M)):
print(r)
如果您的子任务是真正并行的(不依赖于任何共享状态),那么您可以使用
multiprocesing.Pool()
实现这一点看看 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
这要求您使pool.map()的参数可序列化。您不能将生成器传递给工作程序,但可以通过在目标函数中定义生成器,并将初始化参数传递给多处理库来实现类似的功能:
输出将是:
请注意,此解决方案只有在您构建生成器,然后只使用它们一次时才真正起作用,因为它们的最终状态在辅助函数结束时丢失
请记住,由于进程间通信的限制,任何时候想要使用多处理时,都必须使用for serializable objects;这常常被证明是有限的
如果您的进程不受CPU限制,而是受I/O限制(磁盘访问、网络访问等),那么使用线程就会容易得多
你不能腌制发电机。阅读更多关于它的信息here
有一篇博文对此做了更详细的解释。引用其中的一段话:
他还提出了一个解决方案,使用简单的迭代器
另一个提供的solution(我没有尝试过)表明了这一点
将生成器转换为一个类,其中生成器代码是
__iter__
方法将
__getstate__
和__setstate__
方法添加到类中,以处理拾取问题。请记住,不能对文件对象进行pickle。因此__setstate__
必须根据需要重新打开文件您不需要对生成器进行pickle,只需将生成器的索引发送到处理池
请注意,直到在处理函数中,我才调用/初始化生成器
使用
imap_unordered
将允许您在每个生成器完成时处理结果相关问题 更多 >
编程相关推荐