我想我遗漏了一些东西(仍然是Dask Noob),但我正在尝试批处理建议,以避免从此处执行太多Dask任务:
https://docs.dask.org/en/latest/delayed-best-practices.html
不能让他们工作。 这就是我所尝试的:
import dask
def f(x):
return x*x
def batch(seq):
sub_results = []
for x in seq:
sub_results.append(f(x))
return sub_results
batches = []
for i in range(0, 1000000000, 1000000):
result_batch = dask.delayed(batch, range(i, i + 1000000))
batches.append(result_batch)
批现在包含延迟对象:
batches[:3]
[Delayed(range(0, 1000000)),
Delayed(range(1000000, 2000000)),
Delayed(range(2000000, 3000000))]
但是当我计算它们时,我得到了批处理函数指针(我想??)地址:
results = dask.compute(*batches)
results[:3]
(<function __main__.batch(seq)>,
<function __main__.batch(seq)>,
<function __main__.batch(seq)>)
我有两个问题:
这真的应该这样运行吗,因为它似乎与Best practices
页面的第一行相矛盾,它说要而不是像delayed(f(x))
一样运行它,因为这样会立即运行,而不是懒惰。
如何获得上述批处理运行的结果?
目前没有回答
相关问题 更多 >
编程相关推荐