具有有状态工作线程的线程池执行器

2024-09-28 03:20:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用一个Backend类,它生成一个子进程来执行CPU绑定的工作。我无法控制该类,基本上唯一的交互方式是创建一个实例backend = Backend(),并通过backend.run(data)提交工作(这反过来将工作提交到子流程和块,直到完成)。因为这些计算需要相当长的时间,我想并行执行它们。由于Backend类已经生成了自己的子进程来执行实际工作,因此这似乎是一种IO绑定的情况

所以我考虑使用多个线程,每个线程使用自己的Backend实例。我可以手动创建这些线程,并通过队列连接它们。以下是一些Backend模拟类的示例实现:

import os
import pty
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


class Backend:
    def __init__(self):
        f, g = pty.openpty()
        self.process = Popen(
            ['bash'],  # example program
            text=True, bufsize=1, stdin=PIPE, stdout=g)
        self.write = self.process.stdin.write
        self.read = os.fdopen(f).readline

    def __enter__(self):
        self.write('sleep 2\n')  # startup work
        return self

    def __exit__(self, *exc):
        self.process.stdin.close()
        self.process.kill()

    def run(self, x):
        self.write(f'sleep {x} && echo "ok"\n')  # perform work
        return self.read().strip()


class Worker(Thread):
    def __init__(self, inq, outq, **kwargs):
        super().__init__(**kwargs)
        self.inq = inq
        self.outq = outq

    def run(self):
        with Backend() as backend:
            while True:
                data = self.inq.get()
                result = backend.run(data)
                self.outq.put((data, result))


task_queue = Queue()
result_queue = Queue()

n_workers = 3
threads = [Worker(task_queue, result_queue, daemon=True) for _ in range(n_workers)]
for thread in threads:
    thread.start()

data = [2]*7
for x in data:
    task_queue.put(x)

for _ in data:
    print(f'Result ready: {result_queue.get()}')

因为Backend需要在启动时执行一些工作,所以我不想为每个任务创建一个新实例。因此,每个Worker为其整个生命周期创建一个Backend实例。同样重要的是,每个工作人员都有自己的后端,这样他们就不会相互干扰

现在有一个问题:我可以用^{}来完成这个任务吗?看起来^{}方法是合适的候选者,但我不知道如何确保每个工作者都收到自己的Backend实例(在任务之间需要持久化)


Tags: 实例runimportselfbackendfordataqueue
1条回答
网友
1楼 · 发布于 2024-09-28 03:20:35

工作线程的状态可以保存在全局命名空间中,例如作为dict。然后^{}可以用于保存/加载每个工作线程的状态^{}可以作为上下文管理器适当地处理Backend

from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
import os
import pty
from subprocess import PIPE, Popen
import threading


class Backend:
    ...


backends = {}
exit_stack = ExitStack()


def init_backend():
    backends[threading.current_thread()] = exit_stack.enter_context(Backend())


def compute(data):
    return data, backends[threading.current_thread()].run(data)


with exit_stack:
    with ThreadPoolExecutor(max_workers=3, initializer=init_backend) as executor:
        for result in executor.map(compute, [2]*7):
            print(f'Result ready: {result}')

相关问题 更多 >

    热门问题