使用dask.distributed强制或显式数据再平衡

2024-09-27 18:23:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含4个worker的Dask MPI集群,一个加载到Dask数组中的3D网格数据集,并分成4个块。我的应用程序要求每个工作者只运行一个任务,最好每个任务运行一个块。我遇到的麻烦是让这些块以可靠的、可复制的方式分布在集群中。特别是,如果我跑array.map\u块(foo),foo在同一个worker上为每个块运行。你知道吗

你知道吗客户端。重新平衡()似乎应该做我想做的事情,但它仍然将所有或大部分块留在同一个worker上。作为一个测试,我尝试将数据重新调整为128个块并再次运行,这会导致7或8个块移动到不同的数据集。这意味着Dask正在使用一种启发式方法来决定何时自动移动块,但没有给我一种强制均匀块分布的方法。你知道吗

下面是我一直在尝试的一个简单的测试脚本(连接到一个具有4个worker/列组的集群)。你知道吗

#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)


#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])

#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])


def test(ar):
    from mpi4py import MPI
    rank = MPI.COMM_WORLD.Get_rank()
    return np.array([rank], ndmin=3, dtype=np.int)

client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())

经过几十次测试运行,这段代码返回了一次秩3上的块,否则所有的块都返回秩0上。你知道吗


Tags: 数据fromimportclientasnp集群array
1条回答
网友
1楼 · 发布于 2024-09-27 18:23:53

由于您的总数据集没有那么大,对from\ array的初始调用只是创建一个块,因此它只属于一个worker(您可以用chunks=另外指定)。如果可能的话,下面的rechunk倾向于不移动数据。你知道吗

假设每个worker都可以访问您的文件,那么最好首先在worker中加载块。你知道吗

你需要这样的函数

def get_chunk(fn, offset, count, shape, dtype):
    with open(fn, 'rb') as f:
        f.seek(offset)
        return np.fromfile(f, dtype=dtype, count=count).reshape(shape)

并为每个块传递不同的偏移量。你知道吗

parts = [da.from_delayed(dask.delayed(get_chunk)(fn, offset, count, shape, dtype), shape, dtype) for offset in [...]]
arr = da.concat(parts)

这与Intake中的npy source自动执行的操作非常相似,代码:https://github.com/intake/intake/blob/master/intake/source/npy.py#L11

相关问题 更多 >

    热门问题