使用队列的Python多处理生产者/消费者/编写器

2024-09-27 09:31:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试处理大型csv文件(超过10GB)。我使用这个代码片段来实现producer-consumer-writer模式,并使用多处理来分离IO和加速计算。但我的代码似乎和单进程代码花费的时间几乎相同。将block=True替换为block=False会使其工作速度比单个进程慢。我做错什么了?在

def data_to_features_par(my_file, features_file):
    def feature_processor(row):
        time.sleep(20)

    def feed(queue, my_file, number_of_consumers):
        with open(snapshot_file) as sp:
            reader = csv.reader(sp)
            for row in reader:
                queue.put(row, block=True)
            for x in range(number_of_consumers):
                queue.put(None)

    def calc(queueIn, queueOut):
        while True:
            try:
                par = queueIn.get(block=True)
                counter += 1
                if par is None:
                    queueOut.put(None)
                    break
                res = feature_processor(par) # time-consuming computations
                queueOut.put(res)
            except Empty:
                pass

    def write(queue, output_file):
        with open(output_file, "w") as fp:
            writer = csv.writer(fp)
            while True:
                try:
                    res = queue.get(block=True)
                    if res is None:
                        break
                    writer.writerow(res)
                except Empty:
                    pass

    cpu_count = multiprocessing.cpu_count()
    workerQueue = Queue()
    writerQueue = Queue()
    calc_proc_count = cpu_count-2
    feedProc = Process(target=feed, args=(workerQueue, my_file, calc_proc_count))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for _ in xrange(calc_proc_count)]
    writProc = Process(target=write, args=(writerQueue, features_file))

    feedProc.start()
    for p in calcProc:
        p.daemon = True
        p.start()
    writProc.start()

    try:
        feedProc.join()
        for p in calcProc:
            p.join()
        writProc.join()

    except KeyboardInterrupt:
        feedProc.terminate()
        for p in calcProc:
            p.terminate()
        writProc.terminate()

Tags: innonetrueforqueueputdefcount

热门问题