如何使用打包在di中的命令批量多处理文件列表

2024-09-29 21:50:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我从文件列表和包含命令的dict开始。你知道吗

现在我得到了一个worker函数,它将一批文件作为列表作为第一个参数,将dict作为第二个参数,并返回一个新的dict

我真的不知道如何让它在多处理中工作,worker函数并行运行x次,其中x是pc的CPU计数,如果文件列表不是“空”的话,它会在一个进程完成后立即启动新的进程。你知道吗

而且commandsdict需要一个带有批号的条目,或者我应该把它作为第三个参数传递吗?你知道吗

这将是没有问题的,收到所有的dict一起(在一个列表?)在最后,尽管当进程返回时能够用它做一些事情是很好的。你知道吗

它是针对Linux的,运行python3.5,我尝试了Process类,但是没有成功地使它工作,(我对编程还是很陌生,所以我可能看不到琐碎的东西解决方案:D),对于pool类,我不知道如何传递带有变异批号的dict。你知道吗

fileslist[1,2,3,4,5,6,7,8,9]
batchsize = 3
commanddict = {"BatchNumberStr": {"job{:05d}/".format(batchnumber): str}, {"command1": {value, type(value)}, etc}

(工人用此批号制作文件夹等) 我将值的类型存储在一个额外的部分中,以便以后进行安全检查,我知道这可能不是最好的方法,但它对我很有用现在:D你知道吗

现在我想拥有和cpu一样多的工作线程,比如这样的

numjobs = len(fileslist) / batchsize
if int(numjobs) != numjobs:
    numjobs = int(numjobs) +1 
else:
    numjobs = int(numjobs)
for i in range(numjobs):
    use = []
    for i in range(batchsize):
        try:
            use.append(fileslist.pop(0))
        except IndexError:
            break
    commanddict.update({"BatchNumberStr": {"job{:05d}/".format(i): str}}
    process = worker(deepcopy(use), deepcopy(commanddict))
    process.start()
    result = process.get()

process.join()

这只是擦伤 如何限制进程并在完成后加入新闻?如果有人能帮我做到这一点,如果有一个游泳池更好的话,我会非常感激:)

更新: 我的解决方案是:

def poolfeeder(allfileslist, workdict):
    a = 0
    numfiles = (list(x for x in workdict["NumberofFilesPerBatch"]))[0]
    status = True
    while status is True:
        filestouse = []
        for x in range(numfiles):
            try:
                filestouse.append(allfileslist.pop(0))
            except IndexError:
                status = False
                break
        a = a + 1
        workdict.update({"BatchNumberStr": {"job{:05d}/".format(a): str}})
        if len(filestouse) > 0:
            yield [deepcopy(filestouse), deepcopy(workdict)]
        else:
            return

def worker(listinput):
    filelist = listinput[0]
    commanddict = listinput[1]
    do stuff
    return result


poolfeed = poolfeeder(allfileslist, dict)
with mp.Pool(processes=numcpus) as p:
    reslist = [p.apply_async(worker, (r,)) for r in poolfeed]
    for result in reslist:
        print(result.get())

这样好吗?你知道吗

如果一个工作进程运行到RunTimeError中,它是保存为结果还是中断所有进程,如果不是,如果其中一个工作进程运行到RunTimeError中,我如何引发SystemExit?你知道吗


Tags: in列表for进程resultprocessdictworker

热门问题