如何使用OracleHook执行查询

2024-10-06 11:29:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要执行一个更新查询(Oracle),该查询包含来自xcom的一些参数,因此我需要使用带有Oracle钩子的PythonOperator(我不确定,但我不能使用Oracle操作符)

我已经对select查询执行了相同的操作,如:

oracle_db = OracleHook(oracle_conn_id)
query = f"SELECT count(1) AS NB FROM mag;"
result = execute_get_first(oracle_db, query) <-- this working well


def execute_get_first(db, query):
  result = None
  try:
      result = db.get_first(query) <-- db.execut() not exist
  except Exception as e:
      logging.error(e)
  return result

我的问题是,对于一个Oracle钩子,它是否存在像get_first这样的方法,而不是执行像update或insert这样的查询


Tags: executedb参数getresultconnqueryselect
1条回答
网友
1楼 · 发布于 2024-10-06 11:29:40

OracleHook继承自DbApiHook,因此DbApiHook的所有方法都可以使用,包括get_firstinsert_rows和其他方法

用法示例:

from airflow.decorators import task
from airflow.providers.oracle.hooks.oracle import OracleHook

@task
def task_1():
    hook = OracleHook(oracle_conn_id="oracle_conn_id")
    rows = [
        (
            "something",
            "something2",
        )
    ]
    target_fields = [
        'col',
        'col2',
    ]
    hook.insert_rows('mag', rows, target_fields)
    value = hook.get_first(sql="SELECT count(1) AS NB FROM mag")
    return value

相关问题 更多 >