如何为并发编写进度计数器/时间?

2024-06-25 22:58:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常简单的Clock类,它将记录时间、任务量,并允许代码的其他部分调用它的静态方法Clock.increment(),以便用户可以获得进度反馈。但是,多进程为每个进程创建单独的副本,即使我在主进程中初始化类,子进程也无法访问它。这是我的Clock类:

class Counter(object):
    '''
    This counter needs to be initiated
    '''
    startTime = time.time()
    currentProgress = 0

    def __init__(self, totalTask):
        # self.startTime = time.time()
        Counter.totalTask = totalTask
        print("Counter initiated")

    def increment():
        Counter.currentProgress += 1
        Counter.expectedTime = ((time.time() - Counter.startTime) / Counter.currentProgress)*(Counter.totalTask - Counter.currentProgress)
        print("Progress: "+str(Counter.currentProgress)+" / "+ str(Counter.totalTask) + " : " + str(float(Counter.currentProgress) / float(Counter.totalTask)*100)+"%")
        print("Expected finish in: " + str(Counter.expectedTime/3600.0) + " hrs")

    increment = staticmethod(increment)

我这样称呼它:

if __name__ == "__main__":

    example = [1,2,3,4,5]

    counter = Counter(len(example))

    p = Pool(processes=2)
    p.map(conprintNum, example)

def conprintNum(num):
    print(num)
    Counter.increment()

这是行不通的。我不想将总任务数手工编码到Clock类中,但这可能是最后的手段(违反了所有良好的编程实践)。不管怎样,我是否可以同时进行这项工作?我也尝试了单例模式,但是没有,它并不能真正解决问题。你知道吗


Tags: selftime进程exampledefcounterprintstarttime
1条回答
网友
1楼 · 发布于 2024-06-25 22:58:35

可以使用^{}在所有子进程之间共享Counter实例:

import time
from functools import partial
from multiprocessing import Pool
from multiprocessing.managers import BaseManager

if __name__ == "__main__":

    example = [1,2,3,4,5]

    # Create our custom manager, register the Counter object with it,
    # start it, and then create our shared Counter instance.
    m = BaseManager()
    m.register('Counter', Counter)
    m.start()
    counter = m.Counter(len(example))

    p = Pool(processes=2)

    # We create a partial so that it's easier to pass the counter instance
    # along with every value in our example iterable.
    func = partial(conprintNum, counter)

    p.map(func, example)

输出:

Counter initiated
1
Progress: 1 / 5 : 20.0%
Expected finish in: 4.0926668379e-05 hrs
2
Progress: 2 / 5 : 40.0%
Expected finish in: 1.61524613698e-05 hrs
3
Progress: 3 / 5 : 60.0%
Expected finish in: 7.86887274848e-06 hrs
4
Progress: 4 / 5 : 80.0%
Expected finish in: 3.15326783392e-06 hrs
5
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs

编辑:

正如在注释中指出的,这里有一个竞争条件,多个进程可以一次进入increment方法,导致输出的显示方式与您希望的不同。如果我们在increment中添加sleep调用,可以更清楚地看到这一点:

def increment():
    Counter.currentProgress += 1
    time.sleep(random.randint(1,6)) # Artificially delay execution.
    Counter.expectedTime = ((time.time() - Counter.startTime) / Counter.currentProgress)*(Counter.totalTask - Counter.currentProgress)
    print("Progress: "+str(Counter.currentProgress)+" / "+ str(Counter.totalTask) + " : " + str(float(Counter.currentProgress) / float(Counter.totalTask)*100)+"%")
    print("Expected finish in: " + str(Counter.expectedTime/3600.0) + " hrs")

现在,输出如下:

Counter initiated
1
2
3
4
Progress: 4 / 5 : 80.0%
Expected finish in: 7.12237589889e-05 hrs
Progress: 4 / 5 : 80.0%
Expected finish in: 7.15903441111e-05 hrs
5
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs

显然那不好。但是,通过在Lock块内调用increment,这是很容易避免的:

def conprintNum(counter, lock, num):
    print(num)
    with lock:
        counter.increment()

if __name__ == "__main__":

    example = [1,2,3,4,5]

    m = SyncManager()  # SyncManager, rather than BaseManager
    m.register('Counter', Counter)
    m.start()
    lock = m.Lock()  # SyncManager comes with a shared Lock implementation.
    counter = m.Counter(len(example))
    p = Pool(processes=4)
    func = partial(conprintNum, counter, lock)

    p.map(func, example)

相关问题 更多 >