我想使用Luigi工作流在spark数据框中并行加载多个文件,并将它们存储在字典中。 加载所有文件后,我希望能够从main中的dictionary中访问这些数据帧,然后进行进一步处理。当我使用一个辅助程序运行Luigi时,此过程有效。如果使用多个辅助程序运行Luigi,则此变量在main方法中为空
任何建议都会有帮助
import Luigi
from Luigi import LocalTarget
from pyspark import SQLContext
from src.etl.SparkAbstract import SparkAbstract
from src.util.getSpark import get_spark_session
from src.util import getSpark,read_json
import configparser as cp
import datetime
from src.input.InputCSVFileComponent import InputCSVFile
import os
from src.etl.Component import ComponentInfo
class fileloadTask(luigi.Task):
compinfo = luigi.Parameter()
def output(self):
return luigi.LocalTarget("src/workflow_output/"+str(datetime.date.today().isoformat() )+"-"+ str(self.compinfo.id)+".csv")
def run(self):
a = InputCSVFile(self.compinfo) ##this class is responsible to return the object of spark dataframe and put it in dictionary
a.execute()
with self.output().open('w') as f:
f.write("done")
class EnqueueTask(luigi.WrapperTask):
compinfo = read_json.read_json_config('path to json file')
def requires(self):
folders = [
comp.id for comp in list(self.compinfo) if comp.component_type == 'INPUTFILE'
]
print(folders)
newcominfo = []
for index, objid in enumerate(folders):
newcominfo.append(self.compinfo[index])
for i in newcominfo:
print(f" in compingo..{i.id}")
callmethod = [fileloadTask(compinfo) for compinfo in newcominfo]
print(callmethod)
return callmethod
class MainTask(luigi.WrapperTask):
def requires(self):
return EnqueueTask()
def output(self):
return luigi.LocalTarget("src/workflow_output/"+str(datetime.date.today().isoformat() )+"-"+ "maintask"+".csv")
def run(self):
print(f"printing mapdf..{SparkAbstract.mapDf}")
res = not SparkAbstract.mapDf
print("Is dictionary empty ? : " + str(res)) ####-------------> this is empty when workers > 1 ################
for key, value in SparkAbstract.mapDf.items():
print("prinitng from dict")
print(key, value.show(10))
with self.output().open('w') as f:
f.write("done")
"""
entry point for spark application
"""
if __name__ == "__main__":
luigi.build([MainTask()],workers=2,local_scheduler=True)
每个工作进程都在其自己的进程中运行。这意味着工作人员不能共享python对象(在本例中是您将结果放入其中的字典)
一般来说,luigi最适合安排有副作用的任务(如写入文件等)
如果您试图并行化在内存中加载数据的任务,我建议使用dask而不是luigi
相关问题 更多 >
编程相关推荐