我使用气流和OracleHook在数据库中进行池处理。 我通过图形界面使用oracle connection执行了连接配置
在执行第一个dag时,在select期间,创建了大约100个会话。 我可以通过监视oracle表来跟踪这一点
SELECT *
FROM v$session;
我希望将一些配置传递给oracle或钩子,例如,可以将连接池或与数据库的会话的最大值限制为20
以下是我的DAG中的一个片段:
args = {
'owner': 'dekstra',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 1
}
dag = DAG(
'documents',
schedule_interval=timedelta(hours=2),
default_args=args
)
listing = PythonOperator(
task_id='listing',
provide_context=True,
python_callable=DocumentsService.listing,
dag=dag
)
listing
class DocumentsService:
@staticmethod
def listing(**kwargs):
data = [column[0] for column in
OracleHook(oracle_conn_id='oracle_connection').get_records(sql=DocumentsQuery.listing())]
logger.info("Remaining Documents: {}".format(len(data)))
shuffle(data)
return data[:50]
我将cx_Oracle
库与OracleHook
结合使用,可以将会话池限制为指定的here,但我不知道如何将此配置传递给OracleHook
目前没有回答
相关问题 更多 >
编程相关推荐