我有一个简单的图形结构,它接受N个独立的任务,然后对它们进行聚合。我不在乎独立任务的结果按什么顺序聚合。有没有一种方法可以在依赖项可用时对其进行操作,从而加快计算速度
考虑下面的例子。在它中,每个并行任务都会等待一些随机时间,然后返回。另一个任务收集结果,形成一个有序队列。如果收集是异步进行的,则顺序将基于任务完成的时间。如果收集是同步进行的,则顺序将由输入静态定义
from multiprocessing import Pool
from dask import delayed
import numpy as np
from time import sleep
def wait(i):
"""Something embarrassingly parallel"""
np.random.seed()
t = np.random.uniform()
sleep(t)
print(i, t)
return i, t
def lineup(who_when):
"""Aggregate"""
order = []
for who, when in who_when:
print(f'who: {who}')
order.append(who)
return order
使用imap_unordered,我们可以看到在所有依赖项完成之前,收集/缩减会尽快开始
n = 5
pool = Pool(processes=n)
lineup(pool.imap_unordered(wait, range(n)))
# Produces something like the following
2 0.2837069069881948
4 0.44156753704276597
who: 2
who: 4
1 0.5563172244950703
0 0.6696008076879393
who: 1
who: 0
3 0.9911326214345308
who: 3
[2, 4, 1, 0, 3]
使用dask.delayed,按照我习惯的方式,结果就像map(),一旦所有依赖项都可用,就开始收集。顺序是静态的
n = 5
order = delayed(lineup)([delayed(wait)(i) for i in range(n)])
order.compute()
# produces something like:
0 0.2792789023871932
2 0.44570072028850705
4 0.6969597596416385
1 0.766705306208266
3 0.9889956337687371
who: 0
who: 1
who: 2
who: 3
who: 4
[0, 1, 2, 3, 4]
dask是否有imap_无序等效物?也许是用dask.bag
对。您可能正在寻找Dask Futures interface的as_completed函数
这里有一个关于Handling Evolving Workflows的Dask示例
为方便起见,我将复制此处填写的as_的文档字符串
竣工
按期货完成的顺序返回期货
这将返回一个迭代器,该迭代器按照输入对象完成的顺序生成输入对象。无论顺序如何,在迭代器上调用next都将阻塞,直到下一个future完成
此外,您还可以在使用.add方法计算期间向该对象添加更多未来
参数
期货:期货集合 按完成顺序迭代的未来对象的列表
结果为:bool(False) 是否等待并包括期货结果;在本例中,当_完成时,将产生(未来、结果)的元组
raise_错误:布尔(真) 当未来的结果引发例外时,我们是否应该提出;仅在结果为True时影响行为
例子
在计算过程中添加更多期货
也可以选择等待,直到收集到结果
相关问题 更多 >
编程相关推荐