在SparkSubmitOperator conf参数中使用字典

2024-09-30 01:28:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用气流和火花设置ETL。
我有一个XCom变量fs_etl_conf中的数据,该变量通过字典推送到XCom:

{'spark.conf.etl.fsname': 'ffff',
'spark.conf.etl.fstype': 'fffff', 'spark.conf.etl.dryrun': 'false',
'spark.conf.etl.input.file': '/path/to/file',
'spark.conf.etl.input.mode': 'FAILFAST',
'spark.conf.etl.output.file': '/path/to/file',
'spark.conf.etl.output.format': 'parquet',
'spark.conf.etl.output.mode': 'ign',
'spark.conf.etl.output.repartition': '',
'spark.conf.etl.output.partitionby': '',
'spark.conf.etl.output.orderby': ''}

但是当我试图把这句话推到我的SparkSubmitOperator里时:

SparkSubmitOperator(conf={{ 'ti.xcom_pull(key="fs_etl_conf")' }},
conn_id='spark', files=None, java_class='class.main'
name="jobname",
application=Variable.get('SPARK_ETL_URL'),
task_id='spark_etl',
dag=dag
)

问题是conf接受dict类型而不是字符串,但我找不到一种正确的方法将xcom值作为dict而不是字符串来提取,然后我就遇到了问题,因为在命令行的构造中,它需要dict而不是字符串


Tags: topath字符串inputoutputmodeconfetl
1条回答
网友
1楼 · 发布于 2024-09-30 01:28:51

我已经制作了一个自定义SparkOperator,将其放入plugins/operators中,并使用它代替常规SparkOperator,我无法将其链接到我的git repo,但这里它有点兼容版本:

import ast
from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook
from airflow.models import BaseOperator
from airflow.settings import WEB_COLORS
from airflow.utils.decorators import apply_defaults

class BetterSparkOperator(BaseOperator):

    template_fields = ('_application', '_conf', '_files', '_py_files', '_jars', '_driver_class_path',
                       '_packages', '_exclude_packages', '_keytab', '_principal', '_proxy_user', '_name',
                       '_application_args', '_env_vars', '_total_executor_cores',  '_executor_cores',
                       '_executor_memory', '_driver_memory','_num_executors' )
    ui_color = WEB_COLORS['LIGHTORANGE']

    @apply_defaults
    def __init__(self, application='', conf=None, conn_id='spark_default',
                 files=None, py_files=None, archives=None, driver_class_path=None,
                 jars=None,java_class=None, packages=None, exclude_packages=None,
                 repositories=None, total_executor_cores=None, executor_cores=None,
                 executor_memory=None, driver_memory=None, keytab=None, principal=None,
                 proxy_user=None, name='airflow-spark', num_executors=None, application_args=None,
                 env_vars=None, verbose=False, spark_binary="spark-submit", *args,**kwargs):
        super(BetterSparkOperator, self).__init__(*args, **kwargs)
        self._application = application
        self._conf = conf
        self._files = files
        self._py_files = py_files
        self._archives = archives
        self._driver_class_path = driver_class_path
        self._jars = jars
        self._java_class = java_class
        self._packages = packages
        self._exclude_packages = exclude_packages
        self._repositories = repositories
        self._total_executor_cores = total_executor_cores
        self._executor_cores = executor_cores
        self._executor_memory = executor_memory
        self._driver_memory = driver_memory
        self._num_executors = num_executors
        self._keytab = keytab
        self._principal = principal
        self._proxy_user = proxy_user
        self._name = name
        self._application_args = application_args
        self._env_vars = env_vars
        self._verbose = verbose
        self._spark_binary = spark_binary
        self._hook = None
        self._conn_id = conn_id

    def execute(self, context):
        """
        Call the SparkSubmitHook to run the provided spark job
        """
        print(self._application_args)
        self._hook = SparkSubmitHook(
            conf=BetterSparkOperator.convert_str_to_value(self._conf), conn_id=self._conn_id,
            files=self._files, py_files=self._py_files, archives=self._archives,
            driver_class_path=self._driver_class_path, jars=self._jars,
            java_class=self._java_class, packages=self._packages,
            exclude_packages=self._exclude_packages, repositories=self._repositories,
            total_executor_cores=self._total_executor_cores, executor_cores=self._executor_cores,
            executor_memory=self._executor_memory, driver_memory=self._driver_memory,
            keytab=self._keytab, principal=self._principal, proxy_user=self._proxy_user,
            name=self._name, num_executors=self._num_executors,
            application_args=BetterSparkOperator.convert_str_to_value(self._application_args),
            env_vars=self._env_vars, verbose=self._verbose, spark_binary=self._spark_binary
        )
        self._hook.submit(self._application)

    def on_kill(self):
        self._hook.on_kill()

    @staticmethod
    def convert_str_to_value(input_str):
        if input_str:
            return ast.literal_eval(input_str)
        else:
            return None

相关问题 更多 >

    热门问题