Python 3
我想知道一个真正干净的pythonic并发数据加载器应该是什么样子。我的一个项目需要这种方法,该项目对太大而无法完全放入内存的数据进行大量计算。因此,我实现了应该并发运行并将数据存储在队列中的数据加载器,以便主进程可以在(同时)加载和准备下一个数据时工作。当然,当队列为空(主进程尝试使用更多项->;队列应等待新数据)或已满(工作进程应等待主进程使用队列中的数据以防止内存不足错误)时,该队列应阻塞。你知道吗
我使用Python的multiprocessing
模块(multiprocessing.Queue
和multiprocessing.Process
)编写了一个类来满足这个需求。该类的关键部分实现如下:
import multiprocessing as mp
from itertools import cycle
class ConcurrentLoader:
def __init__(path_to_data, queue_size, batch_size):
self._batch_size
self._path = path_to_data
filenames = ... # filenames for path 'path_to_data',
# get loaded using glob
self._files = cycle()
self._q = mp.Queue(queue_size)
...
self._worker = mp.Process(target=self._worker_func, daemon=True)
self._worker.start() # only started, never stopped
def _worker_func(self):
while True:
buffer = list()
for i in range(batch_size):
f = next(self._files)
... # load f and do some pre-processing with NumPy
... # add it to buffer
self._q.put(np.array(buffer).astype(np.float32))
def get_batch_data(self):
self._q.get()
这个类有更多的方法,但它们都是为了“方便功能”。例如,它在dict中计算每个文件的加载频率、整个数据集的加载频率等等,但是这些都很容易在Python中实现,并且不会浪费太多计算时间(set、dict等等)。你知道吗
另一方面,由于I/O和预处理,数据部分本身甚至需要几秒钟。这就是为什么我希望这同时发生。你知道吗
ConcurrentLoader
应该:
get_batch_data
,但队列为空while True
浪费资源ConcurrentLoader
的类都要“透明”:它们应该只提供数据的路径并使用get_batch_data
,而不会注意到这实际上是并发工作的(“无障碍使用”)考虑到这些目标(我忘了什么吗?)我应该做些什么来加强当前的实施?它是线程/死锁安全的吗?有没有一种更“pythonic”的实现方式?我能把它弄干净点吗?是不是浪费了资源?你知道吗
任何使用ConcurrentLoader
的类都大致遵循以下设置:
class Foo:
...
def do_something(self):
...
data1 = ConcurrentLoader("path/to/data1", 64, 8)
data2 = ConcurrentLoader("path/to/data2", 256, 16)
...
sample1 = data1.get_batch_data()
sample2 = data2.get_batch_data()
... # heavy computations with data contained in 'sample1' & 'sample2'
# go *here*
请指出任何类型的错误,以改善我的方法或提供一个自己的,更干净,更python的方法。你知道吗
当
multiprocessing.Queue
为空/满且get()
/put()
被自动调用。这种行为对调用函数是透明的。
在
self._worker.daemon = True
之前使用self._worker.start()
,这样当主进程退出时,工作进程将自动终止相关问题 更多 >
编程相关推荐