Airflow数据库挂钩中的SQLAlchemy引擎

奥利·格拉斯

从Airflow连接ID获取SQLAlchemy引擎的最佳方法是什么?

当前,我正在创建一个钩子,检索其URI,然后使用它来创建SQLAlchemy引擎。

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())

这可行,但是两个命令都建立了到数据库的连接。

当我在连接上具有“额外”参数时,需要第三个连接来检索那些参数(请参阅从Airflow Postgres钩子检索完整的连接URI

有没有更短,更直接的方法?

黄Huang

需要明确的是,实际上您的命令将建立两个数据库连接,但连接到两个单独的数据库(除非您尝试连接到Postgres Airflow数据库)。初始化挂钩的第一行不应进行任何连接。只有第二行首先从Airflow数据库中获取连接详细信息(我认为您无法避免),然后使用它来连接到Postgres数据库(我认为这是重点)。

您可以使用以下方法使它稍微简单一些:

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()

看起来很干净,但是如果您想不通过而获得更直接的指示PostgresHook,则可以通过查询Airflow的数据库直接获取。但是,这意味着您将最终复制代码以从连接对象构建URI。如果要继续进行此操作,则get_connection()的基础实现是一个很好的示例。

from airflow.settings import Session

conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)

此外,如果您希望能够访问extras而无需单独的数据库中获取超越get_uri()get_sqlalchemy_engine()不,是可以覆盖BaseHook.get_connection()来连接对象保存到重用一个实例变量。这将需要在之上创建您自己的钩子PostgresHook,因此我知道这可能不是理想的选择。

class CustomPostgresHook(PostgresHook):

    @classmethod
    def get_connection(cls, conn_id):  # type: (str) -> Connection
        conn = super().get_connection(conn_id)
        self.conn_obj = conn  # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
        return conn

postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson

某些内置的Airflow挂钩已经具有此行为(grpc,samba,tableau),但绝对不是标准化的。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在“删除后”挂钩中获取数据库操作的状态

来自分类Dev

数据库引擎的条件更改

来自分类Dev

Access数据库引擎问题

来自分类Dev

SqlAlchemy数据库问题

来自分类Dev

我无法在挂钩中保存数据

来自分类Dev

如何显示useEffect挂钩中的数据?

来自分类Dev

Rails的模型挂钩是否等待数据库事务完成?

来自分类Dev

挂钩和数据库更新后保存Magento地址

来自分类Dev

使用SQLAlchemy连接到数据库

来自分类Dev

SQLAlchemy:难以查询我的数据库

来自分类Dev

使用SQLAlchemy列出数据库表

来自分类Dev

Flask / SQLAlchemy插入数据库

来自分类Dev

SQLAlchemy:数据库删除错误

来自分类Dev

SQLAlchemy如何跟踪数据库更改?

来自分类Dev

SQLAlchemy 2.0迁移-与数据库的连接

来自分类Dev

Python,SQLAlchemy,从数据库获取ID

来自分类Dev

SQLAlchemy:难以查询我的数据库

来自分类Dev

从数据库引擎返回实体时异步处理它们

来自分类Dev

Microsoft SQL Server 2012无数据库引擎

来自分类Dev

搜索Java嵌入式数据库框架/引擎

来自分类Dev

如何下载pentaho数据库模型和报告引擎?

来自分类Dev

PostgreSQL引擎连接限制的RDS数据库

来自分类Dev

极简博客引擎的理想数据库

来自分类Dev

Django / Python应用程序的数据库引擎选择

来自分类Dev

搜索Java嵌入式数据库框架/引擎

来自分类Dev

Delphi Borland数据库引擎64位编译

来自分类Dev

Microsoft JET数据库引擎错误:“正在使用文件”

来自分类Dev

SQL Server 导入向导,Access 数据库引擎

来自分类Dev

不同数据库引擎之间的本机查询日期格式