为并发数据加载器提供干净的pythonic方式?

2024-09-30 01:35:20 发布

您现在位置:Python中文网/ 问答频道 /正文

Python 3

我想知道一个真正干净的pythonic并发数据加载器应该是什么样子。我的一个项目需要这种方法,该项目对太大而无法完全放入内存的数据进行大量计算。因此,我实现了应该并发运行并将数据存储在队列中的数据加载器,以便主进程可以在(同时)加载和准备下一个数据时工作。当然,当队列为空(主进程尝试使用更多项->;队列应等待新数据)或已满(工作进程应等待主进程使用队列中的数据以防止内存不足错误)时,该队列应阻塞。你知道吗

我使用Python的multiprocessing模块(multiprocessing.Queuemultiprocessing.Process)编写了一个类来满足这个需求。该类的关键部分实现如下:

import multiprocessing as mp
from itertools import cycle    

class ConcurrentLoader:
    def __init__(path_to_data, queue_size, batch_size):
        self._batch_size
        self._path = path_to_data
        filenames = ... # filenames for path 'path_to_data',
                        # get loaded using glob
        self._files = cycle()
        self._q = mp.Queue(queue_size)
        ...
        self._worker = mp.Process(target=self._worker_func, daemon=True)
        self._worker.start() # only started, never stopped

    def _worker_func(self):
        while True:
            buffer = list()
            for i in range(batch_size):
                f = next(self._files)
                ... # load f and do some pre-processing with NumPy
                ... # add it to buffer
            self._q.put(np.array(buffer).astype(np.float32))

    def get_batch_data(self):
        self._q.get()

这个类有更多的方法,但它们都是为了“方便功能”。例如,它在dict中计算每个文件的加载频率、整个数据集的加载频率等等,但是这些都很容易在Python中实现,并且不会浪费太多计算时间(set、dict等等)。你知道吗

另一方面,由于I/O和预处理,数据部分本身甚至需要几秒钟。这就是为什么我希望这同时发生。你知道吗

ConcurrentLoader应该:

  • 阻止主进程:如果调用了get_batch_data,但队列为空
  • 块工作进程:如果队列已满,则防止内存不足错误和while True浪费资源
  • 对任何使用ConcurrentLoader的类都要“透明”:它们应该只提供数据的路径并使用get_batch_data,而不会注意到这实际上是并发工作的(“无障碍使用”)
  • 在主进程死亡时终止其工作进程以再次释放资源

考虑到这些目标(我忘了什么吗?)我应该做些什么来加强当前的实施?它是线程/死锁安全的吗?有没有一种更“pythonic”的实现方式?我能把它弄干净点吗?是不是浪费了资源?你知道吗

任何使用ConcurrentLoader的类都大致遵循以下设置:

class Foo:
    ...

    def do_something(self):
        ...
        data1 = ConcurrentLoader("path/to/data1", 64, 8)
        data2 = ConcurrentLoader("path/to/data2", 256, 16)
        ...
        sample1 = data1.get_batch_data()
        sample2 = data2.get_batch_data()
        ... # heavy computations with data contained in 'sample1' & 'sample2'
            # go *here*

请指出任何类型的错误,以改善我的方法或提供一个自己的,更干净,更python的方法。你知道吗


Tags: to数据path方法selfdatasizeget
1条回答
网友
1楼 · 发布于 2024-09-30 01:35:20
  • multiprocessing.Queue为空/满且 get()/put()被自动调用。

  • 这种行为对调用函数是透明的。

  • self._worker.daemon = True之前使用self._worker.start(),这样当主进程退出时,工作进程将自动终止

相关问题 更多 >

    热门问题