我正在尝试使用气流和火花设置ETL。
我有一个XCom变量fs_etl_conf
中的数据,该变量通过字典推送到XCom:
{'spark.conf.etl.fsname': 'ffff',
'spark.conf.etl.fstype': 'fffff', 'spark.conf.etl.dryrun': 'false',
'spark.conf.etl.input.file': '/path/to/file',
'spark.conf.etl.input.mode': 'FAILFAST',
'spark.conf.etl.output.file': '/path/to/file',
'spark.conf.etl.output.format': 'parquet',
'spark.conf.etl.output.mode': 'ign',
'spark.conf.etl.output.repartition': '',
'spark.conf.etl.output.partitionby': '',
'spark.conf.etl.output.orderby': ''}
但是当我试图把这句话推到我的SparkSubmitOperator
里时:
SparkSubmitOperator(conf={{ 'ti.xcom_pull(key="fs_etl_conf")' }},
conn_id='spark', files=None, java_class='class.main'
name="jobname",
application=Variable.get('SPARK_ETL_URL'),
task_id='spark_etl',
dag=dag
)
问题是conf
接受dict类型而不是字符串,但我找不到一种正确的方法将xcom值作为dict而不是字符串来提取,然后我就遇到了问题,因为在命令行的构造中,它需要dict而不是字符串
我已经制作了一个自定义SparkOperator,将其放入
plugins/operators
中,并使用它代替常规SparkOperator,我无法将其链接到我的git repo,但这里它有点兼容版本:相关问题 更多 >
编程相关推荐