我正在使用一个Backend
类,它生成一个子进程来执行CPU绑定的工作。我无法控制该类,基本上唯一的交互方式是创建一个实例backend = Backend()
,并通过backend.run(data)
提交工作(这反过来将工作提交到子流程和块,直到完成)。因为这些计算需要相当长的时间,我想并行执行它们。由于Backend
类已经生成了自己的子进程来执行实际工作,因此这似乎是一种IO绑定的情况
所以我考虑使用多个线程,每个线程使用自己的Backend
实例。我可以手动创建这些线程,并通过队列连接它们。以下是一些Backend
模拟类的示例实现:
import os
import pty
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
class Backend:
def __init__(self):
f, g = pty.openpty()
self.process = Popen(
['bash'], # example program
text=True, bufsize=1, stdin=PIPE, stdout=g)
self.write = self.process.stdin.write
self.read = os.fdopen(f).readline
def __enter__(self):
self.write('sleep 2\n') # startup work
return self
def __exit__(self, *exc):
self.process.stdin.close()
self.process.kill()
def run(self, x):
self.write(f'sleep {x} && echo "ok"\n') # perform work
return self.read().strip()
class Worker(Thread):
def __init__(self, inq, outq, **kwargs):
super().__init__(**kwargs)
self.inq = inq
self.outq = outq
def run(self):
with Backend() as backend:
while True:
data = self.inq.get()
result = backend.run(data)
self.outq.put((data, result))
task_queue = Queue()
result_queue = Queue()
n_workers = 3
threads = [Worker(task_queue, result_queue, daemon=True) for _ in range(n_workers)]
for thread in threads:
thread.start()
data = [2]*7
for x in data:
task_queue.put(x)
for _ in data:
print(f'Result ready: {result_queue.get()}')
因为Backend
需要在启动时执行一些工作,所以我不想为每个任务创建一个新实例。因此,每个Worker
为其整个生命周期创建一个Backend
实例。同样重要的是,每个工作人员都有自己的后端,这样他们就不会相互干扰
现在有一个问题:我可以用^{Backend
实例(在任务之间需要持久化)
工作线程的状态可以保存在全局命名空间中,例如作为dict。然后^{} 可以用于保存/加载每个工作线程的状态^{} 可以作为上下文管理器适当地处理
Backend
相关问题 更多 >
编程相关推荐