在Python中实现一种特殊类型的多处理队列

2024-09-26 22:51:40 发布

您现在位置:Python中文网/ 问答频道 /正文

设想一个倒二叉树,节点A、B、C、D、E、F位于0级。节点G、H、I位于标高1,节点J位于标高2,节点K位于标高3。在

一级:G=func(A,B),H=func(C,D),I=func(E,F)

第2级:J=功能(G,H)

第三级:K=功能(J,I)。在

级别0上的每对节点都必须按顺序处理,级别1上的每对节点都可以按任何顺序处理,但下一级别上的结果必须按所示进行处理,以此类推,直到得到最终结果K

实际问题是一个计算几何问题,其中一系列的实体被融合在一起。A与B相邻,B与C相邻,依此类推。产生的A和B(G)保险丝与C和D(H)保险丝相邻。最终的结果是J和I(K)的熔合。所以你不能融合G和I,因为它们不是相邻的。如果一个级别上的节点数不是2的幂次方,那么最终将得到一个悬空的实体,该实体必须进一步处理一个级别。在

由于fuse进程计算开销大,内存密集,但非常并行,所以我希望使用Python多处理包和某种形式的队列。在计算G=func(A,B)之后,我想把结果G推到队列中,以便进行后续的J=func(G,H)计算。当队列为空时,最后一个结果就是最终结果。记住多人队列不一定会产生FIFO结果,因为I=func(E,F)可能在H=func(C,D)之前完成

我想出了一些(糟糕的)解决方案,但我相信有一个优雅的解决方案超出了我的掌握范围。建议?在


Tags: 功能实体节点队列顺序解决方案级别fuse
1条回答
网友
1楼 · 发布于 2024-09-26 22:51:40

我无法为队列想出一个聪明的设计,但是您可以很容易地用另一个进程替换队列,在我的示例中,我将其称为WorkerManager。此进程收集所有Worker进程的结果,并且仅当有两个相邻的数据包等待处理时才启动新的worker。这样,您就永远不会尝试连接非相邻的结果,所以您可以忽略“levels”并在下一对准备好后立即启动计算。在

from multiprocessing import Process, Queue

class Result(object):
    '''Result from start to end.'''
    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data


class Worker(Process):
    '''Joins two results into one result.'''
    def __init__(self, result_queue, pair):
        self.result_queue = result_queue
        self.pair = pair
        super(Worker, self).__init__()

    def run(self):
        left, right = self.pair
        result = Result(left.start, right.end,
                        '(%s, %s)' % (left.data, right.data))
        self.result_queue.put(result)


class WorkerManager(Process):
    '''
    Takes results from result_queue, pairs them
    and assigns workers to process them.
    Returns final result into final_queue.
    '''
    def __init__(self, result_queue, final_queue, start, end):
        self._result_queue = result_queue
        self._final_queue = final_queue
        self._start = start
        self._end = end
        self._results = []
        super(WorkerManager, self).__init__()

    def run(self):
        while True:
            result = self._result_queue.get()
            self._add_result(result)
            if self._has_final_result():
                self._final_queue.put(self._get_final_result())
                return
            pair = self._find_adjacent_pair()
            if pair:
                self._start_worker(pair)

    def _add_result(self, result):
        self._results.append(result)
        self._results.sort(key=lambda result: result.start)

    def _has_final_result(self):
        return (len(self._results) == 1
                and self._results[0].start == self._start
                and self._results[0].end == self._end)

    def _get_final_result(self):
        return self._results[0]

    def _find_adjacent_pair(self):
        for i in xrange(len(self._results) - 1):
            left, right = self._results[i], self._results[i + 1]
            if left.end == right.start:
                self._results = self._results[:i] + self._results[i + 2:]
                return left, right

    def _start_worker(self, pair):
        worker = Worker(self._result_queue, pair)
        worker.start()

if __name__ == '__main__':
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
    result_queue = Queue()
    final_queue = Queue()
    start = 0
    end = len(DATA)
    man = WorkerManager(result_queue, final_queue, start, end)
    man.start()
    for res in DATA:
        result_queue.put(res)
    final = final_queue.get()
    print final.start
    # 0
    print final.end
    # 6
    print final.data
    # For example:
    # (((0, 1), (2, 3)), (4, 5))

在我的例子中,我使用了一个简单的Worker,它返回括号中的给定数据,用逗号分隔,但您可以在其中放入任何计算。在我的例子中,最终结果是(((0, 1), (2, 3)), (4, 5)),这意味着算法在计算((0, 1), (2, 3))之前计算(0, 1)和{},然后将结果与(4, 5)结合。我希望这就是你想要的。在

相关问题 更多 >

    热门问题