我不知道如何继续这项任务。你能帮忙吗?我正在创建一个dag,用户将在其中手动传递参数。这个dag应该使用“pid”来终止postgres数据库中的挂起查询。 我已经编写了一个代码,无法通过cli传递参数来测试它。 我正在使用此命令: 气流任务测试killer_dag get_idle_查询20210802-t'{“pid”:“12345”}
这是代码: 杀手_dag.py
from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import (kill_hanging_queries,)
from airflow.models.log import Log
from airflow.utils.db import create_session
with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date,
Log.owner, Log.extra) \
.filter(Log.dag_id == 'killer_dag', Log.event ==
'trigger').order_by(Log.dttm.desc()).all()
killer_dag = DAG(
dag_id="killer_dag",
default_args={
"owner": "Data Intelligence: Data Platform",
"email": skyflow_email_list,
"email_on_failure": True,
"email_on_retry": False,
"depends_on_past": False,
"start_date": datetime(2021, 8, 1, 0, 0, 0),
"retries": 10,
"retry_delay": timedelta(minutes=1),
"sla": timedelta(minutes=90),
},
schedule_interval=timedelta(days=1),
)
kill_hanging_queries(killer_dag)
及
utils.py
import logging
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook
from airflow.models import DagRun
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_idle_queries(**kwargs):
logging.info(f"STARTING TO FETCH THE PID")
logging.info(kwargs)
pid= kwargs["pid"]
logging.info(pid)
logging.info("received pid: ", pid)
# return 'Whatever you return gets printed in the logs'
analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
analdb_conn = analdb_hook.get_conn()
analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
get_idle_queries_query = """
SELECT pg_terminate_backend('{pid}');
"""
analdb_cur.execute(get_idle_queries_query)
hanging_queries = analdb_cur.fetchall()
logging.info(f"Listing info about hanging queries {hanging_queries}") # NORO KODO STARTO
for record in hanging_queries:
query = record["terminate_q"]
logging.info(f"Running query: {query}")
analdb_cur.execute(query)
analdb_conn.close()
def kill_hanging_queries(killer_dag):
PythonOperator(
task_id="get_idle_queries",
python_callable=get_idle_queries,
dag=killer_dag,
provide_context=True
) ```
从CLI传递参数的正确方式与您的做法基本相同(除非您在上面的帖子中确实缺少结尾
'
):airflow tasks test killer_dag get_idle_queries 20210802 -t '{"pid":"12345"}'
因此,我认为代码中的问题与如何访问这些参数有关。在
get_idle_queries
中,您可以通过kwargs["params"]["pid"]}
访问它们,如下所示:让我知道这是否对你有效
相关问题 更多 >
编程相关推荐