我有一个气流(v1.10.12
)dag,它触发Sagemaker处理器作业,作为其任务的一部分。我已经编写了一些测试(pytest6.2.2
)来检查dag的基本健全性。
似乎只要从DagBag中通过id获取dag就会触发Sagemaker任务。i、 e当我执行pytest test_file_name.py
时,会触发一个不理想的作业
from airflow.models import DagBag
class TestSagemakerDAG:
@classmethod
def setup(cls):
cls.dagbag = DagBag()
cls.dag = DagBag().get_dag(dag_id='sagemaker-processor')
def test_dag_loaded(self):
"""
To verify if dags are loaded onto the dagabag
:return:
"""
assert self.dagbag.import_errors == {}
assert self.dag is not None
assert len(self.dag.tasks) == 2
为了更清楚,这是Sagemaker处理器作业(sagemaker 2.24.1
)定义的样子
def initiate_sage_maker_job(self, session):
return Processor(
image_uri=self.ecr_uri,
role=self.iam_role,
instance_count=self.instance_count,
instance_type=self.instance_type,
base_job_name=self.processor_name,
sagemaker_session=session,
).run()
并且boto3(v 1.16.63
)会话生成为
def get_session(self):
boto_session = boto3.session.Session()
client = boto_session.client('sagemaker', region_name=self.region)
session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=client)
return session
最后,Dag本身
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
with DAG('sagemaker-processor',
default_args=default_args,
schedule_interval='@hourly',
) as dag:
t1 = BashOperator(
task_id='print_current_date',
bash_command='date'
)
t2 = PythonOperator(
task_id='sagemaker_trigger', python_callable=initiate_sage_maker_job()
)
t1 >> t2
我只是尝试从文件夹导入DAG,检查导入错误,检查上游和下游列表。
另一方面,我已确保在Airflow UI上关闭Dag,或执行airflow scheduler
以开始排队任务。这实际上只是我想使用Pytest执行的标准测试
出现的问题如下
Job Name: airflow-ecr-test-2021-02-26-21-10-39-935
Inputs: []
Outputs: []
[2021-02-26 16:10:39,935] {session.py:854} INFO - Creating processing-job with name airflow-ecr-test-2021-02-26-21-10-39-935
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/Ajeya.Kempegowda/dags/sample_mwaa.py", line 31, in <module>
initiate_sage_maker_job()
File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/sagemaker/processing.py", line 180, in run
experiment_config=experiment_config,
File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/sagemaker/processing.py", line 695, in start_new
processor.sagemaker_session.process(**process_args)
File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/sagemaker/session.py", line 856, in process
self.sagemaker_client.create_processing_job(**process_request)
File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/Users/Ajeya.Kempegowda/anaconda3/envs/airflow/lib/python3.7/site-packages/botocore/client.py", line 676, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the CreateProcessingJob operation: The security token included in the request is expired
显示的错误是TokenExpired
,但它实际上是在创建作业本身时出现的
测试气流时是否有明显的遗漏?我的理解是airflow scheduler
应该让DAG排队,并且只有在被告知执行(Turn on dag on Airflow UI/CLI
)任务时才必须触发任务
任何帮助都将不胜感激。谢谢
目前没有回答
相关问题 更多 >
编程相关推荐