我正在查询一个非常旧的满表。它包含一些损坏的数据,我没有更改或创建视图的权限。每当我使用AirflowOracleHook get_记录选择数据时,就会收到错误消息“ValueError:year-4712超出范围”。我希望通过如下cx_Oracle解决方案所示,为这个特定记录返回None来处理这个问题:Problem empty date cause ValueError: year -9999 is out of range。这需要设置cx_Oracle outputtypehandler属性。当我运行下面的代码时,OutputHandler或DateTimeConverter函数都不会被调用,代码会以与我使用基本OracleHook类时相同的ValueError失败。任何帮助都将不胜感激
from airflow.hooks.oracle_hook import OracleHook
import cx_Oracle
from datetime import datetime
import os
os.environ['NLS_DATE_FORMAT'] = 'YYYY-MM-DD HH24:MI:SS'
class OracleHookTypeHandler(OracleHook):
def __init__(self, oracle_conn_id):
OracleHook.__init__(self, oracle_conn_id)
self.cursor = OracleHook.get_cursor
self.cursor.outputtypehandler = self.OutputHandler
# Dealing with invalid years in the database
def DateTimeConverter(self, value):
print('DateTimeConverter was called')
if value.startswith('4712'):
return None
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
def OutputHandler(self, cursor, name, defaulttype, length, precision, scale):
print('OutputHandler was called')
if defaulttype == cx_Oracle.DATETIME:
return cursor.var(cx_Oracle.STRING, arraysize=cursor.arraysize,
outconverter=self.DateTimeConverter)
def extract(extract_connection)
# Return the extracted records
extract_records_query = 'SELECT col1, col2, col3 FROM table'
o_extract_hook = OracleHookTypeHandler(oracle_conn_id=extract_connection)
print('Extract started')
extract_records = o_extract_hook.get_records(sql=extract_records_query)
return extract_records
更新: 我用下面@joebeeson的答案解决了这个问题。工作代码:
from airflow.hooks.oracle_hook import OracleHook
import cx_Oracle
import sys
from datetime import datetime
from contextlib import closing
import os
os.environ['NLS_DATE_FORMAT'] = 'YYYY-MM-DD HH24:MI:SS'
class OracleHookTypeHandler(OracleHook):
def __init__(self, oracle_conn_id):
OracleHook.__init__(self, oracle_conn_id)
# Override get_records from inherited class dbapihook
def get_records(self, sql, parameters=None):
"""
Executes the sql and returns a set of records.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
if sys.version_info[0] < 3:
sql = sql.encode('utf-8')
with closing(self.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.outputtypehandler = self.OutputHandler
if parameters is not None:
cur.execute(sql, parameters)
else:
cur.execute(sql)
return cur.fetchall()
# Dealing with invalid years in the database
def DateTimeConverter(self, value):
print('DateTimeConverter was called')
if value.startswith('4712'):
return None
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
def OutputHandler(self, cursor, name, defaulttype, length, precision, scale):
print('OutputHandler was called')
if defaulttype == cx_Oracle.DATETIME:
return cursor.var(cx_Oracle.STRING, arraysize=cursor.arraysize,
outconverter=self.DateTimeConverter)
def extract(extract_connection)
# Return the extracted records
extract_records_query = 'SELECT col1, col2, col3 FROM table'
o_extract_hook = OracleHookTypeHandler(oracle_conn_id=extract_connection)
print('Extract started')
extract_records = o_extract_hook.get_records(sql=extract_records_query)
return extract_records
您想从
airflow.hooks.dbapi_hook.DbapiHook
类重写get_records
方法;它不会调用OracleHook.get_cursor
,因此您的作业将不起作用:尽管将您需要的代码部分放入需要这些修改的流程文件可能会更干净
相关问题 更多 >
编程相关推荐