我有以下DAG,它使用一个专用于数据预处理例程的类来执行不同的方法:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
from table_builder import OnlineOfflinePreprocess
else:
print('Define MARKETING_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'max_active_runs': 1,
'concurrency': 4
}
worker = OnlineOfflinePreprocess()
DAG = DAG(
dag_id='marketing_data_preproc',
default_args=default_args,
start_date=datetime.today()
)
import_online_data = PythonOperator(
task_id='import_online_data',
python_callable=worker.import_online_data,
dag=DAG)
import_offline_data = PythonOperator(
task_id='import_offline_data',
python_callable=worker.import_offline_data,
dag=DAG)
merge_aurum_to_sherlock = PythonOperator(
task_id='merge_aurum_to_sherlock',
python_callable=worker.merge_aurum_to_sherlock,
dag=DAG)
merge_sherlock_to_aurum = PythonOperator(
task_id='merge_sherlock_to_aurum',
python_callable=worker.merge_sherlock_to_aurum,
dag=DAG)
upload_au_to_sh = PythonOperator(
task_id='upload_au_to_sh',
python_callable=worker.upload_table,
op_args='aurum_to_sherlock',
dag=DAG)
upload_sh_to_au = PythonOperator(
task_id='upload_sh_to_au',
python_callable=worker.upload_table,
op_args='sherlock_to_aurum',
dag=DAG)
import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock
merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au
这将产生以下错误:
^{pr2}$考虑到flow的工作原理,这一点实际上非常明显:调用的不同类方法的输出不会存储到在图的顶部初始化的全局类对象。在
我能用XCom
来解决这个问题吗?总的来说,如何将OOP的一致性与气流结合起来是什么想法?在
这与OOP和flow无关,而与state和flow有关。在
需要在任务之间传递的任何状态都需要持久地存储。这是因为每个气流任务都是一个独立的进程(甚至可以在不同的机器上运行!)因此,存储器中的通信是不可能的。在
您是正确的,您可以使用XCOM来传递此状态(如果它很小,因为它存储在气流数据库中)。如果它很大,您可能希望将其存储在其他地方,可能是文件系统、S3或HDFS或专门的数据库。在
相关问题 更多 >
编程相关推荐