
2024-10-01 15:44:12

import luigi
from helpers import SQLTask
import helpers
import logging 
import time

acctDate = '201904'
ssisDate = '201905'
runDesc0xx = 'prod period 4 test2'  
runDesc9xx = 'test2'  

YY = acctDate[:4]
MM = acctDate[4:6]

bdx_sql = 'r:\\1.SQL\\BDX_SQL\\'
cmdList = {
        'BDX010': (f'"{bdx_sql}BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ',''),
        'BDX020': (f'"{bdx_sql}BDX_001_NI_DM 020.sql"  ','BDX010'),
        'BDX022a': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql"  -S LWVPDBSQLC070 ','BDX020'),
        'BDX022b': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql"  -S LWVPDBSQLC070  -v Year1={YY} MM={MM}','BDX022a'),
        'BDX022c': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql"  -v Year={YY} Month={MM}', 'BDX022b'),

class BDX_Task(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter(default=None)
    queryKey = luigi.Parameter()
    queryCmd = luigi.Parameter()
    runDesc = luigi.Parameter()
    dependQry = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super(BDX_Task, self).__init__(*args, **kwargs)
        self.trans_id = f"00903_BDX_Query_{self.queryKey}__{self.runDesc}"

    def requires(self):
        cmdListComb = dict(cmdList)

        if self.dependQry != '' and self.dependQry in cmdListComb:
            dep_cmd, dep_dep_key = cmdListComb[self.dependQry]

            klass = globals()[self.dependQry]
            return [klass(         
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = self.dependQry,
                queryCmd = dep_cmd,
                runDesc = self.runDesc,
                dependQry = dep_dep_key
            return []

    def run(self):

        strQuery_and_args = f""" -i {self.queryCmd} """

class BDX_Query_0XX(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter()  
    runDesc = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super(BDX_Query_0XX, self).__init__(*args, **kwargs)

        self.trans_id =  "00902_BDX_Query_0XX" + "__" + self.runDesc  # static.

    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():
            klass = type(queryKey, (BDX_Task,),{})
            globals()[queryKey] = klass
            yield klass(
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry

    def run(self):

class BDX_Query_Main(SQLTask):
    acctDate = luigi.Parameter(default=acctDate)
    ssisDate = luigi.Parameter(default=ssisDate)  # one month lag/later than acctDate
    # runDesc = globals().runDesc

    trans_id = "09000_Metaclass test" + "__" + runDesc9xx  # static.

    def requires(self):
        YY = self.acctDate[:4]
        MM = self.acctDate[4:6]
        acctDate = self.acctDate
        ssisDate = self.ssisDate

        return [BDX_Query_0XX( acctDate=self.acctDate, ssisDate = self.ssisDate, runDesc = runDesc0xx )

    def run(self):

if __name__ == '__main__':


DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=prod period 4 test2) is complete
INFO: Informed scheduler that task   BDX_Query_Main_201904_201905_444c47aebc   has status   PENDING
DEBUG: BDX_Task.__init__ called for queryKey ="BDX010"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX020"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022a"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022b"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022c"
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=) is complete
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql"  , runDesc=prod period 4 test2, dependQry=BDX010) is complete
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql"  -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=BDX020) is complete
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql"  -S LWVPDBSQLC070  -v Year1=2019 MM=04, runDesc=prod period 4 test2, dependQry=BDX022a) is complete
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql"  -v Year=2019 Month=04, runDesc=prod period 4 test2, dependQry=BDX022b) is complete
INFO: Informed scheduler that task   BDX_Query_0XX_201904_prod_period_4_te_201905_73ccfa7be3   has status   PENDING
INFO: Informed scheduler that task   BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_5c6660ab25   has status   PENDING
INFO: Informed scheduler that task   BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_c0677e7954   has status   PENDING
INFO: Informed scheduler that task   BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_784cf5b40a   has status   PENDING
INFO: Informed scheduler that task   BDX020_201904_BDX010__r__1_SQL_BDX_SQ_d37e4e46a2   has status   PENDING
INFO: Informed scheduler that task   BDX010_201904___r__1_SQL_BDX_SQ_9d353a8cd3   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 5 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 7
INFO: Worker Worker(salt=751624561, workers=5, host=LWVPWEACT001, username=i805649, pid=4108) was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1664, in <module>
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "R:/1.PY/DataPipeLine/run_test.py", line 178, in <module>
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 192, in run
    return _run(*args, **kwargs)['success']
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 209, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 172, in _schedule_and_run
    success &= worker.run()
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 1184, in run
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 996, in _run_task
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
**_pickle.PicklingError: Can't pickle <class 'abc.BDX010'>: attribute lookup BDX010 on abc failed**

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

klass = type(queryKey, (BDX_Task,),{'__module__':__name__})



# Run this first outside any other logic so it gets run if someone imports the module:
for queryKey in cmdList.keys():
    globals()[queryKey] = type(queryKey,(BDX_Task,){'__module__':__name__})

#Then you requires function can look like:
class BDX_Query_0XX(SQLTask):

    # ...

    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():
            yield globals()[queryKey](
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry

