如何并行处理相同的图像以避免不必要的拷贝?

2024-09-21 07:51:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用PySpark处理图像数据。我的图像存储在Amazon S3中,我的数据结构将每个图像与多边形列表相关联(例如,最多可能有500个多边形,这是数据倾斜的来源):

image_path_poly_list = [(ImagePath1, [poly1_1, poly1_2, ..., poly1_N]), ..., (ImagePathM, [polyM_1, polyM_2, ..., polyM_K])]

每个图像可以包含一个或多个通道(例如,仅红色或红色、绿色、蓝色),并且每个通道可以高达11000 x 11000像素。换句话说,每个通道可能包含多达500 mb的数据。因此,一幅图像可能占用超过1.5gb的空间(考虑到3个通道,“普通”情况)。图像是.jp2格式的(实际上是非常压缩的),解压非常慢,而且需要cpu。你知道吗

我想剪辑每个图像与各自的多边形。由于图像太大,无法放入Spark驱动程序,我最好下载/打开Spark驱动程序中的图像。因此,我从image_path_poly_list创建一个RDD,然后执行一个map操作来下载/打开图像。你知道吗

rdd1 -> [(Image1, [poly1_1, poly1_2, ..., poly1_N]), ..., (ImageM, [polyM_1, polyM_2, ..., polyM_K])]

接下来,我做一个平面图来得到一个新的成对rdd:

rdd2 -> (Image1, poly1_1), (Image1, poly1_2), ..., (ImageM, polyM_K)

然后我做了一个地图来剪辑每一对图像和多边形。下面是我的代码:

def openImage(elem):
    imagePath, list_of_poly = elem 
    image = downloadAndOpen(imagePath) # download from S3 and load into memory  
    return image, list_of_poly 

def expand(elem):
    image, list_of_poly = elem
    return [(image, poly) for poly in list_of_poly]

def process(elem):
    image, poly = elem
    return clip(image, elem) # clip returns the clipped image

rdd1 = sc.parallelize(image_path_poly_list).map(openImage)
rdd2 = rdd1.flatmap(expand).map(process).collect()

但是,我注意到,我使用的集群几乎处于空闲状态,一段时间后只使用了几个核心,就像数据倾斜一样。我能在数据中找到的唯一的歪斜来源是每个图像可以拥有的多边形的可变数量。我认为flatMap将重新平衡工作负载,因为一对(图像,多边形)的处理时间几乎是恒定的。你知道吗

然后,经过一些网络研究,我认为在flatMap之后,所有对(图像,多边形)仍然位于同一个分区中,并且每个分区(据我所知)由一个内核串行处理。因此,我尝试用一个巨大的numPartitions重新分区。但是,这非常缓慢,因为图像会缓存到磁盘中:

rdd2 = rdd1.flatmap(expand).repartition(hugeNumPartitions).map(process).collect()

有没有一种方法可以做一些类似于平行的事情,这样我就可以使用同一个图像与多边形列表平行剪裁?我认为在map中使用多处理池可以完成这项工作,但我正在尝试避免这种情况,并寻找一种只使用Spark的解决方案。你知道吗

谢谢!你知道吗


Tags: of数据path图像imagemap多边形list

热门问题