继承OracleHook以设置cx_Oracle outputtypehandler,因为选择值错误

2024-10-06 11:28:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在查询一个非常旧的满表。它包含一些损坏的数据,我没有更改或创建视图的权限。每当我使用AirflowOracleHook get_记录选择数据时,就会收到错误消息“ValueError:year-4712超出范围”。我希望通过如下cx_Oracle解决方案所示,为这个特定记录返回None来处理这个问题:Problem empty date cause ValueError: year -9999 is out of range。这需要设置cx_Oracle outputtypehandler属性。当我运行下面的代码时,OutputHandler或DateTimeConverter函数都不会被调用,代码会以与我使用基本OracleHook类时相同的ValueError失败。任何帮助都将不胜感激

from airflow.hooks.oracle_hook import OracleHook
import cx_Oracle

from datetime import datetime
import os
os.environ['NLS_DATE_FORMAT'] = 'YYYY-MM-DD HH24:MI:SS'

class OracleHookTypeHandler(OracleHook):
    def __init__(self, oracle_conn_id):
        OracleHook.__init__(self, oracle_conn_id)
        self.cursor = OracleHook.get_cursor
        self.cursor.outputtypehandler = self.OutputHandler

    # Dealing with invalid years in the database
    def DateTimeConverter(self, value):
        print('DateTimeConverter was called')
        if value.startswith('4712'):
            return None
        return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')

    def OutputHandler(self, cursor, name, defaulttype, length, precision, scale):
        print('OutputHandler was called')
        if defaulttype == cx_Oracle.DATETIME:
            return cursor.var(cx_Oracle.STRING, arraysize=cursor.arraysize,
                              outconverter=self.DateTimeConverter)

def extract(extract_connection)
    # Return the extracted records
    extract_records_query = 'SELECT col1, col2, col3 FROM table'
    o_extract_hook = OracleHookTypeHandler(oracle_conn_id=extract_connection)
    print('Extract started')
    extract_records = o_extract_hook.get_records(sql=extract_records_query)
    return extract_records

更新: 我用下面@joebeeson的答案解决了这个问题。工作代码:

from airflow.hooks.oracle_hook import OracleHook
import cx_Oracle
import sys

from datetime import datetime
from contextlib import closing
import os
os.environ['NLS_DATE_FORMAT'] = 'YYYY-MM-DD HH24:MI:SS'

class OracleHookTypeHandler(OracleHook):
    def __init__(self, oracle_conn_id):
        OracleHook.__init__(self, oracle_conn_id)

    # Override get_records from inherited class dbapihook
    def get_records(self, sql, parameters=None):
        """
        Executes the sql and returns a set of records.

        :param sql: the sql statement to be executed (str) or a list of
            sql statements to execute
        :type sql: str or list
        :param parameters: The parameters to render the SQL query with.
        :type parameters: mapping or iterable
        """
        if sys.version_info[0] < 3:
            sql = sql.encode('utf-8')

        with closing(self.get_conn()) as conn:
            with closing(conn.cursor()) as cur:
    
                cur.outputtypehandler = self.OutputHandler

                if parameters is not None:
                    cur.execute(sql, parameters)
                else:
                    cur.execute(sql)
                return cur.fetchall()

    # Dealing with invalid years in the database
    def DateTimeConverter(self, value):
        print('DateTimeConverter was called')
        if value.startswith('4712'):
            return None
        return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')

    def OutputHandler(self, cursor, name, defaulttype, length, precision, scale):
        print('OutputHandler was called')
        if defaulttype == cx_Oracle.DATETIME:
            return cursor.var(cx_Oracle.STRING, arraysize=cursor.arraysize,
                              outconverter=self.DateTimeConverter)

def extract(extract_connection)
    # Return the extracted records
    extract_records_query = 'SELECT col1, col2, col3 FROM table'
    o_extract_hook = OracleHookTypeHandler(oracle_conn_id=extract_connection)
    print('Extract started')
    extract_records = o_extract_hook.get_records(sql=extract_records_query)
    return extract_records

Tags: importselfsqlgetreturndefextractconn
1条回答
网友
1楼 · 发布于 2024-10-06 11:28:52

您想从airflow.hooks.dbapi_hook.DbapiHook类重写get_records方法;它不会调用OracleHook.get_cursor,因此您的作业将不起作用:

def get_records(self, sql, parameters=None):
    """
    Executes the sql and returns a set of records.

    :param sql: the sql statement to be executed (str) or a list of
        sql statements to execute
    :type sql: str or list
    :param parameters: The parameters to render the SQL query with.
    :type parameters: mapping or iterable
    """
    if sys.version_info[0] < 3:
        sql = sql.encode('utf-8')

    with closing(self.get_conn()) as conn:
        with closing(conn.cursor()) as cur:
 
            # You have access to the `Cursor` (named "cur") object.

            if parameters is not None:
                cur.execute(sql, parameters)
            else:
                cur.execute(sql)
            return cur.fetchall()

尽管将您需要的代码部分放入需要这些修改的流程文件可能会更干净

相关问题 更多 >