基于tim的多处理

2024-09-29 22:19:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我要做一个大名单。设f()为对L进行操作的函数。f()接受另一个变量,该变量每15分钟过期一次,需要更新。下面是一个串行示例:

def main():
    L = openList()
    # START THE CLOCK
    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)
    a = getRenewed()
    for item in L:
        f(item, a)   # operate on item given a
        # CHECK TIME REMAINING
        clockCur = dt.datetime.now()
        clockRem = (clockExp - clockCur).total_seconds()
        # RENEW a IF NEEDED
        if clockRem < 5: # renew with 5 seconds left
            clockStart = dt.datetime.now()
            clockExp = clockStart + dt.timedelta(seconds=900)
            a = getRenewed()

因为f()需要几秒钟的时间(有时甚至更长),所以我想并行化代码。有没有关于计时器的提示?我设想共享clockExp和“a”,当一个进程满足clockRem<;5时,它调用getRenewed()并共享新的“a”和clockExp,然后重复。在


Tags: 函数示例datetimedefdtitemnowtimedelta
1条回答
网友
1楼 · 发布于 2024-09-29 22:19:51

如果getRenewed是幂等的(也就是说,您可以多次调用它而不会产生副作用),那么您只需将现有的计时器代码移动到工作进程中,并让它们在发现自己的计时器已用完时调用一次。这只需要同步您传入的列表中的项,multiprocessing.Pool可以很容易地处理:

def setup_worker():
    global clockExp, a

    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)
    a = getRenewed()

def worker(item):
    global clockExp, a

    clockCur = dt.datetime.now()
    clockRem = (clockExp - clockCur).total_seconds()

    if clockRem < 5: # renew with 5 seconds left
        clockStart = dt.datetime.now()
        clockExp = clockStart + dt.timedelta(seconds=900)
        a = getRenewed()

    f(item, a)

def main(L):
    pool = multiprocessing.Pool(initializer=setup_worker)

    pool.map(worker, L)

如果getRenewed不是幂等的,则需要稍微复杂一点。您将无法在每个工作进程中调用它,因此您需要在您的进程之间设置某种通信方法,以便它们能够在可用时获得最新版本。在

我建议使用multiprocessing.queuea值从主进程传递给工人。您仍然可以对列表项使用Pool,只需确保从主进程异步使用它。像这样,也许:

^{pr2}$

worker仍然需要有一些计时代码,因为否则您将面临一个争用条件,其中一个worker可能会使用主进程在一个批中发送到队列中的两个a值。如果对f的一些调用比其他调用慢得多(这可能是因为它们涉及到从web下载东西)。在

相关问题 更多 >

    热门问题