为什么进程不在多进程中终止

2024-07-02 09:47:26 发布

您现在位置:Python中文网/ 问答频道 /正文

昨天,我使用python中的多处理处理处理了一个大约有2000万行的日志文件。你知道吗

  1. 启动一个名为“producer”的进程,逐行读取文件并将其放入队列。你知道吗
  2. 启动三个名为“consumeri”的进程,从队列中提取一行,并对其进行分析以获得ip。你知道吗
  3. 在main函数中,我启动这些进程并使用join()等待。你知道吗

代码如下

from multiprocessing import Process, Queue
from Queue import Empty
import os
import time    


def put_ip(src, q, number):
    """ 
    read file line by line, and put it to queue
    """
    print "start put_ip: %d" % os.getpid()
    with open(src) as f:
        for line in f:
            q.put(line)
        for i in range(number):
            q.put(EOFError)
    print "stop put_ip"


def get_ip(lock, src, result, index):
    """ 
    fetch line, and extract ip from it
    """
    print "start get_ip %d: %d" % (index, os.getpid())
    ips = []
    while True:
        line = src.get()
        if line == EOFError:
            print "%d get EOFError" % index
            break
        else:
            res = json.loads(line.strip())
            # process res, get ip
            ips.append(ip)
    print "get_ip %d get %d ips" % (os.getpid(), len(ips))
    result.put('\n'.join(ips))
    ips = []
    print "stop get_ip %d" % os.getpid()
    return


def test_get_ip(src, dest, number):
    """ 
    test with single process
    """
    srcq = Queue()
    result = Queue()

    with open(src) as f:
        for line in f:
            # if 'error' not in line:
            srcq.put(line)
        for i in range(number):
            srcq.put(EOFError)

    get_ip(srcq, result, 0)


def main(src, dest, number):
    """ 
    with multiprocess
    """
    srcq = Queue()
    result = Queue()

    producer = Process(target=put_ip, args=(src, srcq, number))

    consumers = [Process(target=get_ip, args=(srcq, result, i)) for i in xrange(number)]

    print 'start at %s' % time.asctime()
    starttime = time.time()

    producer.start()
    for consumer in consumers:
        consumer.start()

    producer.join()
    for consumer in consumers:
        consumer.join()

    with open(dest, 'w') as w:
        while True:
            try:
                res = result.get_nowait()
                w.write(res +'\n')
            except Empty:
                print 'Empty'
                break

    print "time: %f" % (time.time()-starttime)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', dest='src', required=True)
    parser.add_argument('-o', dest='dest', required=True)
    parser.add_argument('-n', dest='number', type=int, default=2)

    args = parser.parse_args()

    main(args.src, args.dest, args.number)
    # test_get_ip(args.src, args.dest, args.number)

结果很奇怪,使用者进程在工作完成后不会终止,并且主函数在join()处被阻塞。

使用不同的套装和代码进行测试,如下所示:

  • 使用test\u get\u ip()而不进行多重处理来处理大小日志文件,效果很好。你知道吗
  • main()与多处理一起使用来处理大型日志文件,它将在join()处阻塞。每个get\u ip进程将打印“stop get\u ip XXXX”,但不终止。你知道吗
  • 使用main()来处理2000行的较小日志文件,它也可以正常工作。get\u ip将终止。你知道吗
  • 如果我不将ip存储在get\u ip()的list ips中,它可以正常工作,不管日志文件大小。你知道吗

那么,有什么问题吗?清单上有限制吗?我错过什么了吗?你知道吗

我的机器环境是:

Linux 3.19.0-32-generic #37~14.04.1-Ubuntu SMP Thu Oct 22 09:41:40 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux 
Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2

谢谢你抽出时间!你知道吗


Tags: inipsrcnumberforgettimeput
1条回答
网友
1楼 · 发布于 2024-07-02 09:47:26

我认为您的问题是,您正在缓冲所有数据,并且只有在消费者完成时才发送结果。你知道吗

假设运行get_ip()的进程在一个列表中收集了1M个ip地址。现在,在终止之前,它需要序列化所有这些数据并通过Queue,然后main()函数将接收并反序列化所有这些数据。你知道吗

我的建议是,您直接将IP地址放入结果队列,然后让main()进程获取它们并在它们出现时写入它们。你知道吗

相关问题 更多 >