在池.连接()异步处理queu

2024-10-04 05:26:19 发布

您现在位置:Python中文网/ 问答频道 /正文

在python文档中,它表示如果maxsize小于或等于零,则队列大小是无限的。我也试过maxsize=-1。但事实并非如此,程序将挂起。所以作为一个解决办法,我创建了多个Queues来处理。但这并不理想,因为我需要处理更大的列表,然后必须创建越来越多的Queue()并添加额外的代码来处理元素。在

queue = Queue(maxsize=0)
queue2 = Queue(maxsize=0)
queue3 = Queue(maxsize=0)
PROCESS_COUNT = 6

def filter(aBigList):

    list_chunks = list(chunks(aBigList, PROCESS_COUNT))

    pool = multiprocessing.Pool(processes=PROCESS_COUNT)

    for chunk in list_chunks:
        pool.apply_async(func1, (chunk,))

    pool.close()
    pool.join()

    allFiltered = []

    # list of dicts
    while not queue.empty():
        allFiltered.append(queue.get())

    while not queue2.empty():
        allFiltered.append(queue2.get())

    while not queue3.empty():
        allFiltered.append(queue3.get())

    //do work with allFiltered

def func1(subList):

    SUBLIST_SPLIT = 3

    theChunks = list(chunks(subList, SUBLIST_SPLIT))

    for i in theChunks[0]:

        dictQ = updateDict(i)
        queue.put(dictQ)

    for x in theChunks[1]:

        dictQ = updateDict(x)
        queue2.put(dictQ)

    for y in theChunks[2]:

        dictQ = updateDict(y)
        queue3.put(dictQ)

Tags: inforqueuecountprocesschunkslistpool
1条回答
网友
1楼 · 发布于 2024-10-04 05:26:19

问题的发生是因为在join调用之前没有处理Queue。 使用multiprocessing.Queue时,应先清空它,然后再尝试加入进纸器进程。Process等待放入Queue中的所有对象在终止之前被刷新。我不知道为什么即使是大尺寸的Queue也是如此,但这可能与下面的{}对象的大小不够大有关。 因此,将您的get调用放在pool.join之前应该可以解决您的问题。在

PROCESS_COUNT = 6

def filter(aBigList):
    list_chunks = list(chunks(aBigList, PROCESS_COUNT))
    pool = multiprocessing.Pool(processes=PROCESS_COUNT)
    result_queue = multiprocessing.Queue()
    async_result = []
    for chunk in list_chunks:
        async_result.append(pool.apply_async(
                            func1, (chunk, result_queue)))

    done = 0
    while done < 3:
         res = queue.get()
         if res == None:
             done += 1
         else:
             all_filtered.append(res)

    pool.close()
    pool.join()

    # do work with allFiltered

def func1(sub_list, result_queue):
    # mapping function
    results = []
    for i in sub_list:
        result_queue.append(updateDict(i))

    result_queue.append(None)

一个问题是为什么你需要自己来处理沟通?如果考虑到以下因素,您可以让Pool为您管理:

^{pr2}$

这样可以避免这种错误。在

编辑 {even}你可以通过使用cdeven函数进一步减少你的代码。 如果块太大,可能会在结果的酸洗过程中出错(如您的评论中所述)。因此,您可以使用map来减小调整缝隙的大小:

PROCESS_COUNT = 6

def filter(aBigList):
    # Run in parallel a internal function of mp.Pool which run
    # UpdateDict on chunk of 100 item in aBigList and return them.
    # The map function takes care of the chunking, dispatching and
    # collect the items in the right order.
    with multiprocessing.Pool(processes=PROCESS_COUNT) as pool:
        allFiltered = pool.map(updateDict, aBigList, chunksize=100)

    # do work with allFiltered

相关问题 更多 >