数据工程与数据科学管道框架
snowflet的Python项目详细描述
雪花提取荷载转换框架
为什么在斯诺夫莱特我比E先?我真的很喜欢雪佛兰的声音
需要环境变量
"PROJECT_ROOT": "${ProjectFolder}" # REQUIRED
"ACCOUNT": "gsXXXXX.west-europe.azure" # REQUIRED
"USER": "user" # REQUIRED
"PASSWORD": secret_password # REQUIRED
"DATABASE": "default_database" # OPTIONAL
"SCHEMA": "default_schema" # OPTIONAL
"WAREHOUSE": "" # OPTIONAL
"ROLE": "" # OPTIONAL
"TIMEZONE": "europe/london" # OPTIONAL
classsnowflet.db.DBExecutor()
雪花API包装
方法
validate_connection()返回雪花版本
query_exec()执行sql查询
参数:
- file_query:查询文件的路径,无论是传递这个还是查询,都可以包含{parameters}
- query:要执行的sql查询,可以包含{parameters}
- return_df:默认为False,在SELECT查询中传递True,返回pandas数据帧
- kwargs:sql中的参数将替换为相应的kwarg值
用法
db = db() # initiate the snowflake connection using env variables
db.close() # close and dismiss the connection
classsnowflet.db.PipelineExecutor()
用于在雪花中执行管道的特殊工具,该工具读取描述管道步骤的yaml文件,并提供运行或测试管道的方法(单元和/或uat)
注释
所有查询文件应符合以下要求(包括模拟数据的CTE):
- 数据库和模式应明确,即“数据库”、“模式”、“表”或数据库.schema.table在
方法
run()执行管道
clone_prod()TBD克隆prod db元数据
clone_clean()TBD删除了克隆的数据库
用法
- 用于运行管道
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
"path_to_pipeline_folder/pipeline_name.yaml") # initiate PipelineExecutor for Run
pipeline.run() # run the pipeline
- 用于ci cd(测试)
from snoflet import PipelineExecutor
pipeline = PipelineExecutor(
"path_to_pipeline_folder/pipeline_name.yaml",
test=True
) # initiate PipelineExecutor for testing
pipeline.run_unit_tests() # run all unit tests in parallel
try:
pipeline.clone_prod() # copy tables' structure from prod
pipeline.run() # run the pipeline on empty tables (dry_run)
finally:
pipeline.clone_clean() # cleans the dev/test environment
YAML定义
结构:
desc:
databases:
batches:
release:
数据库
管道中引用的数据库列表
['database1', 'database2', 'database3']
释放
在执行管道之前执行的文件的列表
示例
release:
date: "2020-05-07"
desc: "change table schema and delete a table from prod"
files:
- path_to_file1
批次
- 包含要执行的批处理列表
- 批处理以串行方式执行
- 批处理中的任务并行运行
batches:
- desc: creates table structure
tasks:
- desc: creates staging tables
tasks:
- desc: creates aggregated tables
tasks:
batches:
- desc: creates table structure
tasks:
- desc: creates staging tables
tasks:
- desc: creates aggregated tables
tasks:
任务:
- desc: creates aggregated tables
tasks:
- desc: use Database
object: query_executor
args:
- file_query: path_to_file.sql
- desc: create table1
object: create_table
args:
- file: path_to_sql_query_select_file.sql
table_schema: path_to_schema_definition_file.sql
database_id: dbtest
schema_id: sctest
table_id: tbtest
mock_file: path_to_mock_file.sql
output_table_name: staging.attr_order_items_pk
对象类型
- 查询执行器:
它是snowflet.db.exec_查询,相同参数
- 创建表:
它是snowflet.db.create_表,相同参数
- 项目
标签: