气流在测试模式下触发一个Maker作业

2024-09-25 08:31:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个气流(v1.10.12)dag,它触发Sagemaker处理器作业,作为其任务的一部分。我已经编写了一些测试(pytest6.2.2)来检查dag的基本健全性。 似乎只要从DagBag中通过id获取dag就会触发Sagemaker任务。i、 e当我执行pytest test_file_name.py时,会触发一个不理想的作业

from airflow.models import DagBag

class TestSagemakerDAG:
    @classmethod
    def setup(cls):

        cls.dagbag = DagBag()
        cls.dag = DagBag().get_dag(dag_id='sagemaker-processor')

    def test_dag_loaded(self):
        """
        To verify if dags are loaded onto the dagabag
        :return:
        """
        assert self.dagbag.import_errors == {}
        assert self.dag is not None
        assert len(self.dag.tasks) == 2

为了更清楚,这是Sagemaker处理器作业(sagemaker 2.24.1)定义的样子

def initiate_sage_maker_job(self, session):
    return Processor(
        image_uri=self.ecr_uri,
        role=self.iam_role,
        instance_count=self.instance_count,
        instance_type=self.instance_type,
        base_job_name=self.processor_name,
        sagemaker_session=session,
    ).run()

并且boto3(v 1.16.63)会话生成为

def get_session(self):
    boto_session = boto3.session.Session()
    client = boto_session.client('sagemaker', region_name=self.region)
    session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=client)
    return session

最后,Dag本身

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

with DAG('sagemaker-processor',
         default_args=default_args,
         schedule_interval='@hourly',
         ) as dag:
    t1 = BashOperator(
        task_id='print_current_date',
        bash_command='date'
    )
    t2 = PythonOperator(
        task_id='sagemaker_trigger', python_callable=initiate_sage_maker_job()   
    )
    
    t1 >> t2

我只是尝试从文件夹导入DAG,检查导入错误,检查上游和下游列表。 另一方面,我已确保在Airflow UI上关闭Dag,或执行airflow scheduler以开始排队任务。这实际上只是我想使用Pytest执行的标准测试

出现的问题如下

Job Name:  airflow-ecr-test-2021-02-26-21-10-39-935
Inputs:  []
Outputs:  []
[2021-02-26 16:10:39,935] {session.py:854} INFO - Creating processing-job with name airflow-ecr-test-2021-02-26-21-10-39-935
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/Ajeya.Kempegowda/dags/sample_mwaa.py", line 31, in <module>
    initiate_sage_maker_job()
  File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/sagemaker/processing.py", line 180, in run
    experiment_config=experiment_config,
  File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/sagemaker/processing.py", line 695, in start_new
    processor.sagemaker_session.process(**process_args)
  File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/sagemaker/session.py", line 856, in process
    self.sagemaker_client.create_processing_job(**process_request)
  File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/botocore/client.py", line 676, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the CreateProcessingJob operation: The security token included in the request is expired

显示的错误是TokenExpired,但它实际上是在创建作业本身时出现的

测试气流时是否有明显的遗漏?我的理解是airflow scheduler应该让DAG排队,并且只有在被告知执行(Turn on dag on Airflow UI/CLI)任务时才必须触发任务

任何帮助都将不胜感激。谢谢


Tags: nameinpyselfclientsessionlinejob