我正在尝试处理大型csv文件(超过10GB)。我使用这个代码片段来实现producer-consumer-writer模式,并使用多处理来分离IO和加速计算。但我的代码似乎和单进程代码花费的时间几乎相同。将block=True
替换为block=False
会使其工作速度比单个进程慢。我做错什么了?在
def data_to_features_par(my_file, features_file):
def feature_processor(row):
time.sleep(20)
def feed(queue, my_file, number_of_consumers):
with open(snapshot_file) as sp:
reader = csv.reader(sp)
for row in reader:
queue.put(row, block=True)
for x in range(number_of_consumers):
queue.put(None)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block=True)
counter += 1
if par is None:
queueOut.put(None)
break
res = feature_processor(par) # time-consuming computations
queueOut.put(res)
except Empty:
pass
def write(queue, output_file):
with open(output_file, "w") as fp:
writer = csv.writer(fp)
while True:
try:
res = queue.get(block=True)
if res is None:
break
writer.writerow(res)
except Empty:
pass
cpu_count = multiprocessing.cpu_count()
workerQueue = Queue()
writerQueue = Queue()
calc_proc_count = cpu_count-2
feedProc = Process(target=feed, args=(workerQueue, my_file, calc_proc_count))
calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for _ in xrange(calc_proc_count)]
writProc = Process(target=write, args=(writerQueue, features_file))
feedProc.start()
for p in calcProc:
p.daemon = True
p.start()
writProc.start()
try:
feedProc.join()
for p in calcProc:
p.join()
writProc.join()
except KeyboardInterrupt:
feedProc.terminate()
for p in calcProc:
p.terminate()
writProc.terminate()
目前没有回答
相关问题 更多 >
编程相关推荐