Pyspark Luigi多工人问题

2024-10-03 02:38:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我想使用Luigi工作流在spark数据框中并行加载多个文件,并将它们存储在字典中。 加载所有文件后,我希望能够从main中的dictionary中访问这些数据帧,然后进行进一步处理。当我使用一个辅助程序运行Luigi时,此过程有效。如果使用多个辅助程序运行Luigi,则此变量在main方法中为空

任何建议都会有帮助

 import Luigi
    from Luigi import LocalTarget
    
    from pyspark import SQLContext
    
    from src.etl.SparkAbstract import SparkAbstract
    from src.util.getSpark import  get_spark_session
    from  src.util import getSpark,read_json
    import configparser as cp
    import  datetime
    from src.input.InputCSVFileComponent import InputCSVFile
    import os
    from src.etl.Component import ComponentInfo
    
    class fileloadTask(luigi.Task):
    
        compinfo = luigi.Parameter()
    
        def output(self):
            return luigi.LocalTarget("src/workflow_output/"+str(datetime.date.today().isoformat() )+"-"+ str(self.compinfo.id)+".csv")
    
        def run(self):
    
            a = InputCSVFile(self.compinfo)  ##this class is responsible to return the object of  spark dataframe and put it in dictionary
            a.execute()
            with self.output().open('w') as f:
                f.write("done")
    
    class EnqueueTask(luigi.WrapperTask):
     compinfo = read_json.read_json_config('path to json file')
    
        def requires(self):
            folders = [
                comp.id for comp in list(self.compinfo) if comp.component_type == 'INPUTFILE'
            ]
            print(folders)
            newcominfo = []
            for index, objid in enumerate(folders):
                newcominfo.append(self.compinfo[index])
    
            for i in newcominfo:
                print(f" in compingo..{i.id}")
    
            callmethod = [fileloadTask(compinfo) for compinfo in newcominfo]
            print(callmethod)
    
            return callmethod
    
    class MainTask(luigi.WrapperTask):
    
        def requires(self):
            return EnqueueTask()
    
        def output(self):
            return luigi.LocalTarget("src/workflow_output/"+str(datetime.date.today().isoformat() )+"-"+ "maintask"+".csv")
    
        def run(self):
            print(f"printing mapdf..{SparkAbstract.mapDf}")
            res = not SparkAbstract.mapDf
            print("Is dictionary empty ? : " + str(res)) ####-------------> this is empty when workers > 1 ################
            for key, value in SparkAbstract.mapDf.items():
                print("prinitng from dict")
                print(key, value.show(10))
    
            with self.output().open('w') as f:
                f.write("done")
    
    """
    entry point for spark application
    """
    if __name__ == "__main__":
        luigi.build([MainTask()],workers=2,local_scheduler=True)

Tags: infromimportselfsrcforoutputreturn
1条回答
网友
1楼 · 发布于 2024-10-03 02:38:47

每个工作进程都在其自己的进程中运行。这意味着工作人员不能共享python对象(在本例中是您将结果放入其中的字典)

一般来说,luigi最适合安排有副作用的任务(如写入文件等)

如果您试图并行化在内存中加载数据的任务,我建议使用dask而不是luigi

相关问题 更多 >