2024-09-30 06:22:24 发布
网友
我已经编写了一个Luigi管道来提取1.2百万个文件,然后对它们做一些sed工作-参见https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。在
如果我在Luigi的几千个文件上运行这个,它的时间表很好。但是在整个数据集上运行它会抱怨Failed connecting to remote scheduler。不知道我是不是做对了。在
Failed connecting to remote scheduler
我建议不要在文件超过1k时为每个文件创建单独的任务,创建一个运行在这些文件目录上的批处理任务可能会更幸运。然后,此任务可以使用多处理来利用对处理函数的并行调用。在
from multiprocessing import Pool, cpu_count import os class TestTask(luigi.WrapperTask): inglob = luigi.Parameter(default='/1002/*.gz') outdir = luigi.Parameter(default='/1002-out/') tmpdir = luigi.Parameter(default='/1002-tmp/' def extract_file(filename): # extract file to self.tempdir not shown def output(self): return luigi.LocalTarget(self.outdir) def run(self): os.makedirs(self.tempdir) p = Pool(cpu_count()) p.map(extract_file, glob(self.inglob)) os.rename(self.tempdir, self.outdir)
我建议不要在文件超过1k时为每个文件创建单独的任务,创建一个运行在这些文件目录上的批处理任务可能会更幸运。然后,此任务可以使用多处理来利用对处理函数的并行调用。在
相关问题 更多 >
编程相关推荐