任务队列大小有限的Python线程池

2024-10-01 17:23:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我的问题是:我有一个multiprocessing.pool.ThreadPool对象和一个主worker_count工人和一个主pqueue,我从中向池中提供任务。在

流程如下:有一个主循环从pqueue获取level级别的项,并使用apply_async将其提交给池。处理该项时,它生成level + 1的项。问题是池接受所有任务并按提交的顺序处理它们。在

更准确地说,正在发生的是,level 0项被处理,每个项生成100个立即从pqueue检索并添加到池中的level 1项,每个level 1项生成提交给池的100level 2项,以此类推,这些项以BFS方式处理。在

我需要告诉池不要接受超过worker_count个项目,以便有机会从pqueue检索更高级别的项目,以便以DFS方式处理项目。在

我目前的解决方案是:对于每个提交的任务,将AsyncResult对象保存在asyncres_list列表中,在从pqueue检索项之前,我删除已处理的项(如果有),检查asyncres_list的长度是否低于池中每0.5秒的线程数,像这样,只有thread_number个项目将同时被处理。在

我想知道是否有更干净的方法来实现这种行为,而且我似乎在文档中找不到一些参数来限制可以提交给池的最大任务数。在


Tags: 项目对象count方式流程级别levelmultiprocessing
1条回答
网友
1楼 · 发布于 2024-10-01 17:23:46

ThreadPool是一个用于普通任务的简单工具。如果您想自己管理队列,以获得DFS行为;您可以直接在threadingqueue模块上实现必要的功能。在

要防止计划下一个根任务,直到当前任务派生的所有任务都完成(“类似DFS”的顺序),可以use ^{}

#!/usr/bin/env python3
import queue
import random
import threading
import time

def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
    for task in iter(q.get, None):  # blocking get until None is received
        try:
            if len(task) < maxlevel:
                for i in range(multiplicity):
                    q.put(task + str(i))  # schedule the next level
            time.sleep(random.random())  # emulate some work
            with lock:
                print(task)
        finally:
            q.task_done()

worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
           for _ in range(worker_count)]
for t in threads:
    t.start()

for task in "01234":  # populate the first level
    q.put(task)
    q.join()  # block until all spawned tasks are done
for _ in threads:  # signal workers to quit
    q.put(None)
for t in threads:  # wait until workers exit
    t.join()

代码示例是从the ^{} module documentation中的示例派生的。在

每个级别的任务生成multiplicity直接子任务,这些子任务生成它们自己的子任务,直到到达maxlevel。在

None用于向工人发出辞职的信号。t.join()用于等待线程正常退出。如果主线程因任何原因中断,那么守护进程线程将被终止,除非存在其他非守护进程线程(您可能需要提供SIGINT hanlder,以指示工作线程在Ctrl+C上优雅地退出,而不是仅仅死机)。在

使用queue.LifoQueue(),以获得“后进先出”顺序(由于多个线程,这是近似值)。在

未设置maxsize,否则工作线程可能会死锁,因此您必须将任务放在某个地方。worker_count无论任务队列是什么,后台线程都在运行。在

相关问题 更多 >

    热门问题