我在哪里和什么地方模仿气流PythonOperator
,这样:
python_callback
引发异常,触发on_failure_callback
的调用,并且我尝试在很多地方嘲笑{python可调用的}和{
代码文件如下所示:
dags/车型年款_代码.py
class CustomException(Exception): pass
def a_callable():
if OurSqlAlchemyTable.count() == 0:
raise CustomException("{} is empty".format(OurSqlAlchemyTable.name))
return True
def a_failure_callable(context):
SlackWebhookHook(
http_conn_id=slack_conn_id,
message= context['exception'].msg,
channel='#alert-channel'
).execute()
dags/a_日期
^{pr2}$dags/测试_日期
class TestCallback(unittest.TestCase):
def test_on_failure_callback(self):
tested_task = DagBag().get_dag('dag-named-sue').get_task('new-task')
with patch('airflow.operators.python_operator.PythonOperator.execute') as mock_execute:
with patch('dags.a_dag.a_failure_callable') as mock_callback:
mock_execute.side_effect = CustomException
tested_task.execute(context={})
# does failure of the python_callable trigger the failure callback?
mock_callback.assert_called()
# did the exception message make it to the failure callback?
failure_context = mock_callback.call_args[0]
self.assertEqual(failure_context['exception'].msg,
'OurSqlAlchemyTable is empty')O
测试确实在self.task.execute(context={})
行引发CustomException
——但是,在测试代码本身中。我想要的是错误
在气流代码中引发,使PythonOperator
失败并调用on_failure_callback
。在
我尝试过任何数量的排列,都不是在测试中提升,就是没有 触发、调用python_callable或找不到要修补的对象:
patch('dags.a_dag.a_callable') as mock_callable
'a_dag.a_callable'
'dags.my_code.a_callable'
'my_code.a_callable'
'airflow.models.Task.execute'
(Python3
,pytest
,和mock
。)
我遗漏了什么/做错了什么?
(更好的是,我想验证传递给SlackWebhookHook
的参数。比如:
with patch('???.SlackWebhookHook.execute') as mock_webhook:
... as above ...
kw_dict = mock_webhook.call_args[-1]
assert kw_dict['http_conn_id'] == slack_conn_id
assert kw_dict['message'] == 'OurSqlAlchemyTable is empty'
assert kw_dict['channel'] == '#alert-channel'
(但我首先将重点放在测试失败回调上。)
提前谢谢你。在
目前没有回答
相关问题 更多 >
编程相关推荐