在pythonluigi上调度大量作业

2024-09-30 06:22:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经编写了一个Luigi管道来提取1.2百万个文件,然后对它们做一些sed工作-参见https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。在

如果我在Luigi的几千个文件上运行这个,它的时间表很好。但是在整个数据集上运行它会抱怨Failed connecting to remote scheduler。不知道我是不是做对了。在


Tags: 文件to数据httpsgithubcom管道时间表
1条回答
网友
1楼 · 发布于 2024-09-30 06:22:24

我建议不要在文件超过1k时为每个文件创建单独的任务,创建一个运行在这些文件目录上的批处理任务可能会更幸运。然后,此任务可以使用多处理来利用对处理函数的并行调用。在

from multiprocessing import Pool, cpu_count
import os

class TestTask(luigi.WrapperTask):
    inglob = luigi.Parameter(default='/1002/*.gz')
    outdir = luigi.Parameter(default='/1002-out/')
    tmpdir = luigi.Parameter(default='/1002-tmp/'

    def extract_file(filename):
        # extract file to self.tempdir not shown

    def output(self):
        return luigi.LocalTarget(self.outdir)

    def run(self):
        os.makedirs(self.tempdir)
        p = Pool(cpu_count())
        p.map(extract_file, glob(self.inglob))
        os.rename(self.tempdir, self.outdir)

相关问题 更多 >

    热门问题