数据工程与数据科学管道框架

snowflet的Python项目详细描述


雪花提取荷载转换框架

为什么在斯诺夫莱特我比E先?我真的很喜欢雪佛兰的声音

需要环境变量

"PROJECT_ROOT": "${ProjectFolder}"             # REQUIRED
"ACCOUNT":  "gsXXXXX.west-europe.azure"        # REQUIRED
"USER": "user"                                 # REQUIRED
"PASSWORD": secret_password                    # REQUIRED
"DATABASE": "default_database"                 # OPTIONAL
"SCHEMA": "default_schema"                     # OPTIONAL
"WAREHOUSE": ""                                # OPTIONAL
"ROLE": ""                                     # OPTIONAL
"TIMEZONE": "europe/london"                    # OPTIONAL

classsnowflet.db.DBExecutor()

雪花API包装

方法

validate_connection()返回雪花版本

query_exec()执行sql查询

参数:

  • file_query:查询文件的路径,无论是传递这个还是查询,都可以包含{parameters}
  • query:要执行的sql查询,可以包含{parameters}
  • return_df:默认为False,在SELECT查询中传递True,返回pandas数据帧
  • kwargs:sql中的参数将替换为相应的kwarg值
^{pr2}$

用法

db = db() # initiate the snowflake connection using env variables
db.close() # close and dismiss the connection

classsnowflet.db.PipelineExecutor()

用于在雪花中执行管道的特殊工具,该工具读取描述管道步骤的yaml文件,并提供运行或测试管道的方法(单元和/或uat)

注释

所有查询文件应符合以下要求(包括模拟数据的CTE):

  • 数据库和模式应明确,即“数据库”、“模式”、“表”或数据库.schema.table在

方法

run()执行管道

clone_prod()TBD克隆prod db元数据

clone_clean()TBD删除了克隆的数据库

用法

  • 用于运行管道
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
    "path_to_pipeline_folder/pipeline_name.yaml")     # initiate PipelineExecutor for Run
pipeline.run()                                        # run the pipeline
  • 用于ci cd(测试)
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
    "path_to_pipeline_folder/pipeline_name.yaml", 
    test=True
    )                                                 # initiate PipelineExecutor for testing
pipeline.run_unit_tests()                             # run all unit tests in parallel
try:
    pipeline.clone_prod()                    # copy tables' structure from prod
    pipeline.run()                                    # run the pipeline on empty tables (dry_run)
finally:
    pipeline.clone_clean()                          # cleans the dev/test environment

YAML定义

结构:

desc: 
databases: 
batches:    
release:

数据库

管道中引用的数据库列表

['database1', 'database2', 'database3']

释放

在执行管道之前执行的文件的列表

示例

release:
  date: "2020-05-07"
  desc: "change table schema and delete a table from prod"
  files:
    - path_to_file1

批次
  • 包含要执行的批处理列表
  • 批处理以串行方式执行
  • 批处理中的任务并行运行
batches:
-   desc: creates table structure
    tasks:
-   desc: creates staging tables
    tasks:
-   desc: creates aggregated tables
    tasks:

任务:

-   desc: creates aggregated tables
    tasks:
    -   desc: use Database
        object: query_executor
        args:
        -   file_query: path_to_file.sql
    -   desc: create table1
        object: create_table
        args:
        -   file: path_to_sql_query_select_file.sql
            table_schema: path_to_schema_definition_file.sql
            database_id: dbtest
            schema_id: sctest
            table_id: tbtest
            mock_file: path_to_mock_file.sql
            output_table_name: staging.attr_order_items_pk 

对象类型

  • 查询执行器:

它是snowflet.db.exec_查询,相同参数

  • 创建表:

它是snowflet.db.create_表,相同参数

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何更改由场景生成器生成的按钮的颜色?   DFT中的java幅度谱   java如何从安卓中删除软件包。罐子   java使用Guava Collections2过滤ArrayList。Android中的过滤器   为什么分支预测比没有分支更快?   java Spring@AutoWired始终为空   java mapreduce计数差异   目录如何在Java中创建文件夹?   java Edittext在单击时失去焦点   java如何使用从密码派生的密钥正确加密和解密文件   java文件更改加载策略不反映应用程序中所做的更改。属性文件,而不重新启动服务器   java Web。xml文件工作不正常   java渲染图像以120 fps的速度闪烁   Spring批处理中的java未终止双引号   jerseyspring中过滤器的java单元测试   在Android应用程序上引发未经处理的异常时,java Eclipse中断用户代码   netbeans Java映像处理   java如何使用安卓 studio在Facebook时间线字段上传文本?   如果我知道文件的字符串名,如何访问R.file?   java Bullet物理,纹理球体不滚动