来自气流数据库钩子的SQLAlchemy引擎

2024-09-29 17:13:01 发布

您现在位置:Python中文网/ 问答频道 /正文

从气流连接ID获取SQLAlchemy引擎的最佳方法是什么

目前我正在创建一个钩子,检索它的URI,然后使用它创建一个SQLAlchemy引擎

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())

这是可行的,但两个命令都连接到数据库

当我在连接上有“额外”参数时,需要第三个连接来检索这些参数(请参见Retrieve full connection URI from Airflow Postgres hook

有没有一种更短更直接的方法


Tags: 方法引擎selfid参数sqlalchemypostgresuri
1条回答
网友
1楼 · 发布于 2024-09-29 17:13:01

要明确的是,您的命令确实会建立两个数据库连接,但它只能连接到两个单独的数据库(除非您试图连接到Postgres数据库)。初始化钩子的第一行不应该建立任何连接。只有第二行首先从Airflow数据库中获取连接细节(我认为您无法避免),然后使用该细节连接到Postgres数据库(我认为这就是重点)

不过,您可以使用以下工具使其稍微简单一些:

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()

这看起来很干净,但是如果您不想通过PostgresHook而更直接地获取它,您可以通过查询Airflow的数据库直接获取它。但是,这意味着您最终将复制代码,从连接对象构建URI。如果您想继续此操作,那么底层的implementation of get_connection()就是一个很好的示例

from airflow.settings import Session

conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)

此外,如果您希望能够访问extras,而不使用get_uri()get_sqlalchemy_engine()之外的单独数据库获取,则可以重写BaseHook.get_connection(),将连接对象保存到实例变量以供重用。这需要在PostgresHook之上创建自己的钩子,所以我知道这可能并不理想

class CustomPostgresHook(PostgresHook):

    @classmethod
    def get_connection(cls, conn_id):  # type: (str) -> Connection
        conn = super().get_connection(conn_id)
        self.conn_obj = conn  # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
        return conn

postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson

一些内置的气流挂钩已经有了这种行为(grpc、samba、tableau),但它肯定不是标准化的

相关问题 更多 >

    热门问题