自以为是的轻量级etl管道框架

data-integration的Python项目详细描述


MARA数据集成

Build StatusPyPI - LicensePyPI versionSlack Status

这个包包含一个轻量级ETL框架,重点是透明性和复杂性降低。它有许多成熟的假设/原则:

  • 数据集成管道作为代码:管道、任务和命令是使用声明性python代码创建的。

  • PostgreSQL作为数据处理引擎。

  • 广泛的网络用户界面。web浏览器作为管道检测、运行和调试的主要工具。

  • GNU生成语义。节点依赖于上游节点的完成。没有数据依赖关系或数据流。

  • 无应用内数据处理:命令行工具是与数据库和数据交互的主要工具。

  • 基于python的multiprocessing的单机管道执行。不需要分布式任务队列。易于调试和输出日志记录。

  • 基于成本的优先级队列:首先运行成本较高的节点(基于记录的运行时间)。

安装

要直接使用库,请使用pip:

pip install data-integration

pip install git+https://github.com/mara/data-integration.git

对于集成到flask应用程序中的示例,请查看mara example project

示例

这里是一个管道“演示”,由三个相互依赖的节点组成:任务ping_localhost、管道sub_pipeline和任务sleep

fromdata_integration.commands.bashimportRunBashfromdata_integration.pipelinesimportPipeline,Taskfromdata_integration.ui.cliimportrun_pipeline,run_interactivelypipeline=Pipeline(id='demo',description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')pipeline.add(Task(id='ping_localhost',description='Pings localhost',commands=[RunBash('ping -c 3 localhost')]))sub_pipeline=Pipeline(id='sub_pipeline',description='Pings a number of hosts')forhostin['google','amazon','facebook']:sub_pipeline.add(Task(id=f'ping_{host}',description=f'Pings {host}',commands=[RunBash(f'ping -c 3 {host}.com')]))sub_pipeline.add_dependency('ping_amazon','ping_facebook')sub_pipeline.add(Task(id='ping_foo',description='Pings foo',commands=[RunBash('ping foo')]),['ping_amazon'])pipeline.add(sub_pipeline,['ping_localhost'])pipeline.add(Task(id='sleep',description='Sleeps for 2 seconds',commands=[RunBash('sleep 2')]),['sub_pipeline'])

任务包含执行实际工作的命令列表(在本例中运行ping各种主机的bash命令)。

为了运行管道,需要配置postgresql数据库来存储运行时信息、运行输出和增量处理的状态:

importmara_db.auto_migrationimportmara_db.configimportmara_db.dbsmara_db.config.databases \
    =lambda:{'mara':mara_db.dbs.PostgreSQLDB(host='localhost',user='root',database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()

假设postgressql正在运行并且凭据正常工作,则输出如下所示(创建一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATE TABLE data_integration_file_dependency (
    node_path TEXT[] NOT NULL, 
    dependency_type VARCHAR NOT NULL, 
    hash VARCHAR, 
    timestamp TIMESTAMP WITHOUT TIME ZONE, 
    PRIMARY KEY (node_path, dependency_type)
);

.. more tables

客户端用户界面

这将运行一个输出到stdout的管道:

fromdata_integration.ui.cliimportrun_pipelinerun_pipeline(pipeline)

Example run cli 1

它运行管道的单个节点sub_pipeline,以及它所依赖的所有节点:

run_pipeline(sub_pipeline,nodes=[sub_pipeline.nodes['ping_amazon']],with_upstreams=True)

Example run cli 2

最后,还有一种基于pythondialog的菜单,允许导航和运行这样的管道:

fromdata_integration.ui.cliimportrun_interactivelyrun_interactively()

Example run cli 3

网络用户界面

更重要的是,这个包提供了一个广泛的web界面。它可以很容易地集成到任何基于Flask的应用程序中,并且mara example project演示了如何使用mara-app实现这一点。

对于每个管道,都有一个显示

  • 所有子节点及其依赖关系的图
  • 管道的总体运行时间图表以及过去30天内最昂贵的节点(可配置)
  • 所有管道节点及其平均运行时间和由此产生的队列优先级的表
  • 管道最后一次运行的输出和时间线

Mara data integration web ui 1

对于每个任务,都有一个页面显示

  • 管道中任务的上游和下游
  • 过去30天内任务的运行时间
  • 任务的所有命令
  • 任务最后一次运行的输出

Mara data integration web ui 2

管道和任务可以直接从Web UI运行,这可能是此软件包的主要功能之一:

Example run web ui

入门

文档目前正在处理中。请使用mara example project作为入门参考。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
二维数组的java置换   eclipse如何在java中提供读取文件的相对地址   java将嵌套文档添加到嵌套文档数组中   java为什么要删除。导致maven编译错误的目录?   java Weave AspectJ方面依赖于约束注释   java如何在listview中选择后自动播放视频序列?   JavaEEWeb应用程序与Web服务   JavaSpringWebFlow提交包含新项的数组   java Try and Catch with JOption无法按预期工作   java如何使用导航抽屉中的字符串链接本地HTML页面?   java如何确保关闭文件   这能在一个Mac桌面应用程序中使用Java后端提供的ObjectiveC UI吗?   protobuf生成的代码导致的java Eclipse构建循环(与Maven Project Builder相关)   java JPA onetomany过滤   java实体管理器批量更新提供了组织。冬眠PersistentObjectException:传递给persist的分离实体   macos java。lang.RuntimeException:无法启动Selenium会话:   java Glide转换选择墙纸的位置?   java(在实体上使用@ConditionalOnProperty的变通方法)   Android系列。对java进行排序。lang.NullPointerException