如何模拟pythonooperator的python可调用和on-failure\u回调?

2024-10-02 16:24:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我在哪里和什么地方模仿气流PythonOperator,这样:

  • python_callback引发异常,触发on_failure_callback的调用,并且
  • 我可以测试该回调是否被调用,以及使用什么参数?在

我尝试在很多地方嘲笑{python可调用的}和{},但都没有成功。在

代码文件如下所示:

dags/车型年款_代码.py

class CustomException(Exception): pass

def a_callable():
    if OurSqlAlchemyTable.count() == 0:
        raise CustomException("{} is empty".format(OurSqlAlchemyTable.name))
    return True

def a_failure_callable(context):
    SlackWebhookHook(
        http_conn_id=slack_conn_id,
        message= context['exception'].msg,
        channel='#alert-channel'
        ).execute()

dags/a_日期

^{pr2}$

dags/测试_日期

class TestCallback(unittest.TestCase):

    def test_on_failure_callback(self):
        tested_task = DagBag().get_dag('dag-named-sue').get_task('new-task')

        with patch('airflow.operators.python_operator.PythonOperator.execute') as mock_execute:
            with patch('dags.a_dag.a_failure_callable') as mock_callback:
                mock_execute.side_effect = CustomException
                tested_task.execute(context={})

            # does failure of the python_callable trigger the failure callback?
        mock_callback.assert_called()

            # did the exception message make it to the failure callback?
        failure_context = mock_callback.call_args[0]
        self.assertEqual(failure_context['exception'].msg,
                        'OurSqlAlchemyTable is empty')O

测试确实在self.task.execute(context={})行引发CustomException——但是,在测试代码本身中。我想要的是错误 在气流代码中引发,使PythonOperator失败并调用on_failure_callback。在

我尝试过任何数量的排列,都不是在测试中提升,就是没有 触发、调用python_callable或找不到要修补的对象:

patch('dags.a_dag.a_callable') as mock_callable
      'a_dag.a_callable'
      'dags.my_code.a_callable'
      'my_code.a_callable'
      'airflow.models.Task.execute'

Python3pytest,和mock。)

我遗漏了什么/做错了什么?

(更好的是,我想验证传递给SlackWebhookHook的参数。比如:

with patch('???.SlackWebhookHook.execute') as mock_webhook:
    ... as above ...

kw_dict = mock_webhook.call_args[-1]
assert kw_dict['http_conn_id'] == slack_conn_id
assert kw_dict['message'] == 'OurSqlAlchemyTable is empty'
assert kw_dict['channel'] == '#alert-channel'

(但我首先将重点放在测试失败回调上。)

提前谢谢你。在


Tags: idtaskexecutefailureascontextcallbackchannel