使用das并行化数据迭代器

2024-05-19 01:13:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我实现了一个数据迭代器,它从两个numpy数组中获取对象,并在返回它们之前对它们进行非常密集的CPU计算。我想用Dask并行化这个。下面是此迭代器类的简单版本:

import numpy as np

class DataIterator:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        item1, item2 = x[idx], y[idx]
        # Do some very heavy computations here by
        # calling other methods and return  
        return item1, item2

x = np.random.randint(20, size=(20,))
y = np.random.randint(50, size=(20,))

data_gen = DataIterator(x, y)

现在,我使用一个简单的for循环遍历这些项,如下所示:

for i, (item1, item2) in enumerate(data_gen):
    print(item1, item2)

但这真的很慢。有人能帮我用dask并行吗?你知道吗


Tags: selfnumpydatasizelenreturndefnp
1条回答
网友
1楼 · 发布于 2024-05-19 01:13:20

最简单的方法是使用延迟交货以及装饰getitem方法。另一种方法是将x,y转换成dask数组,然后在getitem方法中使用磁盘阵列命令。由于您尚未提供繁重计算的详细信息,下面的示例仅供参考。你知道吗

你知道吗延迟交货地址:

from dask import delayed
import numpy as np

class DataIterator:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    @delayed
    def __getitem__(self):
        item1 = x.mean()
        item2 = y.sum()
        # Do some very heavy computations here by
        # calling other methods and return  
        return item1, item2

x = np.random.randint(20, size=(20,))
y = np.random.randint(50, size=(20,))

data_gen = DataIterator(x, y)
x_mean, y_sum = data_gen.__getitem__().compute()

输出:

x_mean, y_sum
Out[41]: (8.45, 479)

你知道吗磁盘阵列地址:

import dask.array as da
import numpy as np

class DataIterator:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self):
        item1 = x.mean()
        item2 = y.sum()
        # Do some very heavy computations here by
        # calling other methods and return  
        return item1.compute(), item2.compute()

x = np.random.randint(20, size=(20,))
y = np.random.randint(50, size=(20,))

x = da.from_array(x, chunks = x.shape[0] // 4)
y = da.from_array(y, chunks = y.shape[0] // 4)

data_gen = DataIterator(x, y)
x_mean, y_sum = data_gen.__getitem__()

输出:

x_mean, y_sum
Out[50]: (10.4, 461)

相关问题 更多 >

    热门问题