组织Python的多处理队列的内容/输出

2024-10-04 15:31:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在编写一个脚本来处理一个类对象的几个不同实例,该类对象包含许多属性和方法。对象都放在一个列表(myobjects = [myClass(IDnumber=1), myClass(IDnumber=2), myClass(IDnumber=3)])中,然后通过相当简单的for循环进行修改,for循环从对象中调用特定函数

for x in myobjects: 
    x.myfunction()

这个脚本利用日志记录,将所有输出转发到一个日志文件,我可以稍后检查。我正在尝试并行化这个脚本,因为这样做非常简单(下面的示例),并且需要利用队列来组织每个进程的所有日志输出。这一方面工作完美无瑕——我可以为每个进程定义一个新的日志文件,然后将特定于对象的日志文件传递回我的主脚本,然后主脚本可以通过依次附加每个次要日志文件来组织主日志文件

from multiprocessing import Process, Queue
q = Queue()
threads = []
mainlog = 'mylogs.log'    #this is set up in my __init__.py but included here as demonstration
for x in myobjects:
    logfile = x.IDnumber+'.log'
    thread = Process(target=x.myfunction(), args=(logfile, queue))
    threads.append(thread)
    thread.start()
for thread in threads:
    if thread.is_alive():
        thread.join()
while not queue.empty():
    minilog = queue.get()
    minilog_open = open(minilog, 'r')
    mainlog_open = open(mainlog, 'a+')
    mainlog_open.write(minilog_open.read())

现在,我的问题是,我还需要这些对象来更新一个特定的属性,x.success,无论它是真是假。通常,在串行中,x.successx.myfunction()的末尾被更新,并被发送到它需要去的脚本下,并且一切都很好。然而,在这个并行实现中,x.myfunction填充了x.success在这个过程中,但是这些信息永远不会返回到主脚本-因此如果我在{}中添加{},我会看到{}或{},但是如果我在{}块之后添加{},我只会看到{}。我意识到我可以用与使用{}相同的方法在{}中使用{},但当两个或多个进程同时完成时会发生什么呢?据我所知,无法保证我的队列会像

  1. 日志文件(用于myobjects[0]
  2. success=True(对于myobjects[0]
  3. 日志文件(用于myobjects[1]
  4. success=False(对于myobjects[1])(等)

如果队列同时包含日志文件和变量,如何组织队列中特定于对象的输出?我需要知道每个x.myfunction()x.success的内容,以便以某种方式将信息返回到主进程


Tags: 文件对象in脚本for队列进程open
1条回答
网友
1楼 · 发布于 2024-10-04 15:31:08

OP要求提供一个示例来演示我在评论中提到的概念。解释如下:

import concurrent.futures


class MyObject:
    def __init__(self):
        self._ID = str(id(self))
        self._status = None

    @property
    def ID(self):
        return self._ID

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, status):
        self._status = status

    def MyFunction(self):
        # do the real work here
        self.status = True


def MyThreadFunc(args):
    myObject = args[0]
    myObject.MyFunction()
    # note that the wrapper function returns a tuple
    return myObject.status, myObject.ID


if __name__ == '__main__':
    N = 10  # number of instances of MyObject
    myObjects = [MyObject() for _ in range(N)]
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {executor.submit(MyThreadFunc, [o]): o for o in myObjects}
        for future in concurrent.futures.as_completed(futures):
            _status, _id = future.result()
            print(f'Status is {_status} for ID {_id}')

MyObject类显然做的不多。其关键特性是它有一个字符串版本的id、一个状态和一个函数,该函数执行某些操作,但隐式返回无

我们编写了一个包装器函数,它引用MyObject的实例(iterable参数中的第一个元素),在该特定类实例上执行MyFunction(),然后以元组的形式返回该类的ID和状态

主循环使用了一种我经常使用的模式,我相信其他很多人也会这样做。使用字典理解,我们构建了所谓的“未来”。请记住,submit()的第二个参数必须是iterable,即使MyThreadFunc只需要一个值

然后,我们等待线程完成并获取其返回值

相关问题 更多 >

    热门问题