如何等待尚未提交到线程池的未来?

2024-10-01 15:35:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我想创建一个“TaskPoolManager”,它在ThreadPoolExecutor中启动“Task”(自定义对象),并根据重要性级别、提交后的时间等对它们进行优先级排序(这些是Task的属性)

我的问题是当ThreadPoolExecutor已满时,提交到池中的另一个任务将以“FIFO”方式执行,而不是按优先级排序

这里是TaskPoolManager类:


class TaskPoolManager:
    def __init__(self, max_workers: int = None):
        self.max_workers = max_workers or (os.cpu_count() or 1) * 5
        self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers, 
                                                 thread_name_prefix="TaskPoolManager")
        self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
        self.running_workers = 0

    # Task are callable
    def submit(self, task: Task) -> Future:
        if self.running_workers == self.max_workers:
            return self._add_task_to_queue(task)
        else:
            return self._start_task(task)

    def _start_task(self, task: Task) -> Future:
        """Submit a task in the pool"""
        self.running_workers = self.running_workers + 1
        future = self._pool_executor.submit(task)
        future.add_done_callback(lambda x: self._completed_thread())
        return future

    def _add_task_to_queue(self, task: Task) -> Future:
        """Add task to the not started task queue"""
        not_started_future = Future()
        self.pending_task[task] = not_started_future
        return not_started_future

    def _completed_thread(self):
        """Call when a thread in the pool as terminated a task"""
        self.running_workers = self.running_workers - 1
        self._start_task_in_queue()  # By priority level

下面是一个如何使用它的示例:

manager = TaskPoolManager()

for i in range(0, 10000):
    manager.submit(Task(func=wait_random_time_task))

f = manager.submit(Task(func=wait_random_time_task))

# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()

是否有方法将实例化的Future客户端连接到执行中稍后由ThreadPoolExecutor.submit创建的Future实例

如果没有,是否有方法返回类似Future的对象,该对象可以稍后与未来关联,并且仍然等待.result()

换句话说:如何等待尚未提交到线程池的未来


Tags: toselftaskdeffuturethreadrunningmax
1条回答
网友
1楼 · 发布于 2024-10-01 15:35:48

最后,事情并没有那么复杂:

def _start_task_in_queue(self):
    try:
        # Algorithm could be more complex than just the first one
        task, returned_future = next(iter(self.pending_task.items()))
    except StopIteration:
        return
    started_future = self._pool_executor.submit(task)
    started_future.add_done_callback(lambda result: returned_future.set_result(result))

我不知道这是否是解决我的问题的最佳方法,因为Future.__init__指定:

Initializes the future. Should not be called by clients

但它是这样工作的

相关问题 更多 >

    热门问题