在python多处理p中为worker获取唯一ID

2024-09-28 19:04:50 发布

您现在位置:Python中文网/ 问答频道 /正文

有没有一种方法可以为python多处理池中的每个worker分配一个惟一的ID,从而使池中的某个特定worker运行的作业能够知道哪个worker在运行它?根据文件,a Process有一个name但是

The name is a string used for identification purposes only. It has no semantics. Multiple processes may be given the same name.

对于我的特定用例,我想在一个由四个GPU组成的组上运行一组作业,并且需要为该作业应该运行的GPU设置设备号。由于作业的长度不一致,我希望确保在前一个作业完成之前,在试图在其上运行的作业的GPU上不会发生冲突(因此这排除了提前为工作单元预先分配ID)。


Tags: 文件the方法nameidforstringgpu
3条回答

我用线程完成了这项工作,最后使用a queue来处理作业管理。这是基线。我的完整版本有一堆try-catches(特别是在worker中,以确保即使失败也调用q.task_done())。

from threading import Thread
from queue import Queue
import time
import random


def run(idx, *args):
    time.sleep(random.random() * 1)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        while True:
            args = q.get()
            run(idx, *args)
            q.task_done()

    for job in jobs:
        q.put(job)

    for i in range(0, workers):
        t = Thread(target=worker, args=[i])
        t.daemon = True
        t.start()

    q.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

我不需要使用多处理(我的工作人员只是调用外部进程),但这可以扩展。用于多处理的API改变了它,下面是如何适应的:

from multiprocessing import Process, Queue
from Queue import Empty
import time
import random

def run(idx, *args):
    time.sleep(random.random() * i)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        try:
            while True:
                args = q.get(timeout=1)
                run(idx, *args)
        except Empty:
            return

    for job in jobs:
        q.put(job)

    processes = []
    for i in range(0, workers):
        p = Process(target=worker, args=[i])
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes: 
        p.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

两个版本都将输出如下内容:

0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)

您可以使用multiprocessing.Queue来存储id,然后在池进程初始化时获取id。

优点:

  • 你不需要依赖内部。
  • 如果您的用例是管理资源/设备,那么您可以直接输入设备号。这也将确保没有设备被使用两次:如果池中的进程多于设备,则附加进程将在queue.get()上阻塞,并且不会执行任何工作(这不会阻塞porgram,或者至少在我测试时不会)。

缺点:

  • 你有额外的通信开销和产生池 进程需要稍长的时间:没有 示例所有工作都可以由第一个流程执行,就像其他流程一样 尚未完成初始化。
  • 你需要一个全球性的(或者至少我 不知道该怎么办)

示例:

import multiprocessing
from time import sleep

def init(queue):
    global idx
    idx = queue.get()

def f(x):
    global idx
    process = multiprocessing.current_process()
    sleep(1)
    return (idx, process.pid, x * x)

ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()

for i in ids:
    idQueue.put(i)

p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))

输出:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]

注意,虽然池包含8个进程,并且一个idx仅由一个进程使用,但只有4个不同的pid。

你想要的似乎很简单:multiprocessing.current_process()。例如:

import multiprocessing

def f(x):
    print multiprocessing.current_process()
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

输出:

$ python foo.py 
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]

这将返回进程对象本身,因此进程可以是自己的标识。你也可以对它调用id来获得一个唯一的数字id——在cpython中,这是进程对象的内存地址,所以我不认为有重叠的可能。最后,您可以使用进程的identpid属性——但这只在进程启动时设置。

此外,在我看来,在源代码中,自动生成的名称(如上面的Processrepr字符串中的第一个值所示)很可能是唯一的。multiprocessing为每个进程维护一个itertools.counter对象,该对象用于为其生成的任何子进程生成一个^{}元组。因此,顶级进程生成具有单值id的子进程,它们生成具有两个值id的进程,依此类推。然后,如果没有名称传递给Process构造函数,那么它只是基于autogenerates the name标识,使用':'.join(...)。然后使用replace的进程的Poolalters the name,保持自动生成的id相同。

所有这一切的结果是,尽管两个Processes可能具有相同的名称,因为您在创建它们时可能会将相同的名称赋给它们,但如果您不触摸name参数,它们是唯一的。另外,理论上可以使用_identity作为唯一标识符;但我认为他们将该变量设为私有是有原因的!

上述措施的一个例子:

import multiprocessing

def f(x):
    created = multiprocessing.Process()
    current = multiprocessing.current_process()
    print 'running:', current.name, current._identity
    print 'created:', created.name, created._identity
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

输出:

$ python foo.py 
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]

相关问题 更多 >