在apacheai中运行sparksubmit需要帮助吗

2024-09-30 01:21:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我是Python和flow的相对新用户,在让spark-submit在flow任务中运行非常困难。我的目标是让下面的DAG任务成功运行

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'matthew',
    'start_date': datetime(2019, 7, 8)
}

dag = DAG('CustomCreate_test2',
          default_args=default_args,
          schedule_interval=timedelta(days=1))

t3 = BashOperator(
    task_id='run_test',
    bash_command='spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar',
    dag=dag
)

我知道问题出在气流而不是bash,因为当我在终端中运行spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar命令时,它会成功运行。在

我从气流记录中得到以下错误

^{pr2}$

我也尝试过使用SparkSubmitOperator(...),但是没有成功地使用它,我只得到了如下错误日志

...
[2019-08-28 15:54:49,749] {logging_mixin.py:95} INFO - [[34m2019-08-28 15:54:49,749[0m] {[34mspark_submit_hook.py:[0m427} INFO[0m - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)[0m
[2019-08-28 15:54:49,803] {taskinstance.py:1047} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--num-executors', '2', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'CustomCreate', '--class', 'CLASSPATH.CustomCreate', '--verbose', '--queue', 'root.default', '--deploy-mode', 'cluster', '~/IdeaProjects/custom-create-job/build/libs/custom-create.jar']. Error code is: 1.
...

在我可以在BashOperator(...)任务中运行spark-submit ...命令之前,我需要使用SparkSubmitOperator(...)做些什么吗?在

有没有办法直接从SparkSubmitOperator(...)任务运行我的spark-submit命令?在

在Airflow的管理->连接页面中,我有什么要做的吗?在

在Airflow的管理->用户页面中有什么必须设置的吗? 有什么必须设置为允许气流运行spark或运行由特定用户创建的jar文件吗?如果是,什么/如何?在


Tags: 用户fromimportbashdefaultdatetimecreatecustom
2条回答

类似的问题已经得到了回答- StackOverFlow Link

我想上面的链接会对你有所帮助。在

将来,如果您希望在AWS EMRAZURE上实现相同的功能,那么您有一个很好的方法来安排spark作业-Airflow Documentation

以上示例-(AWS EMR)

 <airflow_EMR_task> =cover_open(json.load(open(airflow_home+'/<tasks_json_containing_all_spark_configurations>')))
 <airflow_EMR_task>['Job']['Name'] =  <airflow_EMR_task>['Job']['Name'] + <'optional_postfix'>
airflow_swperformance_cpu_creator = EmrRunJobFlowOperator(
    task_id='<task_id>',
    job_flow_overrides= <airflow_EMR_task>['Job'],
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    retries=1,
    dag=dag
)

一个简单的JSON将是-(与上面提到的相同的JSON文件)

^{pr2}$

就这些了。在

我找到了一个解决这个问题的方法。在

创建一个新的ssh连接(或编辑默认值),如下所示在AirflowAdmin->connection页面中 Airflow SSH Connection Example

如果您看不到图像,以下是文本版本
Conn ID:ssh\u连接
连接类型:SSH
主机:主机IP地址
用户名:主机用户名
密码:主机密码
端口:
Extra:{“key_file”:“/PATH TO HOME DIR/flow/.ssh/id_rsa”,“允许主机密钥更改”:“true”,“无主机密钥检查”:“true”}

然后对python脚本进行适当的调整

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'matthew',
    'start_date': datetime(2019, 8, 28)
}

dag = DAG('custom-create',
          default_args=default_args,
          schedule_interval=timedelta(days=1),
          params={'project_source': '~/IdeaProjects/custom-create-job',
                  'spark_submit': '/usr/local/bin/spark-submit',
                  'classpath': 'CLASSPATH.CustomCreate',
                  'jar_file': 'build/libs/custom-create.jar'}
          )

templated_bash_command = """
    echo 'HOSTNAME: $HOSTNAME' #To check that you are properly connected to the host
    cd {{ params.project_source }}
    {{ params.spark_submit }}  class {{ classpath }} {{ jar_file }}
"""

t1 = SSHOperator(
    task_id="SSH_task",
    ssh_conn_id='ssh_connection',
    command=templated_bash_command,
    dag=dag
)

我希望这个解决方案能帮助其他可能遇到类似问题的人。在

相关问题 更多 >

    热门问题