
2024-09-30

def work(aggregates, genput):
    # received (channel, image) from generator
    channel = genput[0]
    image = genput[1]
    for row in image:
        for pixel in row:
            # use welford's to update a list of "aggregates" which will
            # later be finalized as means and variances of each channel
            aggregates[channel] = update(aggregates[channel], pixel)

def data_stream(df, data_root):
    '''Generator that returns the channel and image for each tif file'''
    for index, sample in df.iterrows():
        curr_img_path = data_root

        # read the image with all channels
        tif = imread(curr_img_path)  #33x64x64 array        
        for channel, image in enumerate(tif):
            yield (channel, image)     

# Pass over each image, compute mean/variance for each channel for each image
def preprocess_mv(df, data_root, channels=33, multiprocessing=True):
    '''Calculates mean and variance on the whole image set for use in deep_learn'''
    manager = Manager()
    aggregates = manager.list()

    [aggregates.append(([0,0,0])) for i in range(channels)]

    proxy = partial(work, aggregates)

    pool = Pool(processes=8) 
    pool.imap(proxy, data_stream(df, data_root), chunksize=5000)

    # finalize data below

我的怀疑是,pickle aggregates数组和从父进程到子进程来回传输所需的时间非常长,这是主要的瓶颈——我可以看到这个缺点完全消除了多进程的优势,因为每个子进程都必须等待其他子进程进行pickle以及解开数据。我读到这是多处理库的一个局限性,从我在这里读到的其他文章中,我意识到这可能是我能做的最好的了。也就是说,有人对如何改进这一点有什么建议吗?你知道吗


def init(arr):
    global aggregates
    aggregates = arr

def work(genput):
    # received (sample, channel, image) from generator
    sample_no = genput[0]
    channel = genput[1]
    image = genput[2]
    currAgg =  (aggregates[3*channel], aggregates[3*channel+1], 
    for row in image:
        for pixel in row:
            # use welford's to compute updated aggregate
            newAgg = update(currAgg, pixel)
            currAgg = newAgg
    # New method of indexing for 1D array ("shaped" as 33x3)
    aggregates[3*channel] = newAgg[0]
    aggregates[(3*channel)+1] = newAgg[1]
    aggregates[(3*channel)+2] = newAgg[2]

def data_stream(df, data_root):
    '''Generator that returns the channel and image for each tif file'''
    yield (index, channel, image)

if __name__ == '__main__':

    aggs = Array('d', np.zeros(99)) #99 values for all aggrs

    pool = Pool(initializer=init, initargs=(aggs,), processes=8)
    pool.imap(work, data_stream(df, data_root), chunksize=10)

#     -finalize aggregates below

