PySpark数据序列化程序
marshmallow-pyspark的Python项目详细描述
棉花糖
Marshmallow是用于数据序列化和验证的流行包。 一种方法是在棉花糖中定义数据模式,其中包含关于如何封送输入数据的规则。类似于棉花糖, pyspark还提供了自己的模式定义,用于 处理数据帧。此包使用户能够利用棉花糖模式及其强大的数据验证功能 在pyspark应用程序中。在数据一致性和质量较高的数据管道ETL作业中,可以使用这些功能 很重要。在
安装
可以使用pip
安装包:
$ pip install marshmallow-pyspark
使用
数据模式可以像使用棉花糖一样定义。下面是一个简单的例子:
^{pr2}$更多选项
除了支持棉花糖的选项之外,Schema
类还附带了两个附加的初始化参数:
- 在
在error_column_name
:要存储验证错误的列的名称。默认值是_errors
。在 - 在
在split_errors
:将包含验证错误的行作为独立的数据帧与有效行分开。当设置为False
时 包含错误的行将与有效行一起作为单个数据帧返回。所有错误行的字段值为 设置为null
。为了方便用户,可以在错误JSON的row
属性中找到原始字段值。 默认值是True
。在
示例如下:
frommarshmallowimportEXCLUDEschema=AlbumSchema(error_column_name="custom_errors",# Use 'custom_errors' as name for errors columnsplit_errors=False,# Don't split the input data frame into valid and errorsunkown=EXCLUDE# Marshmallow option to exclude fields not present in schema)# Input data frame to validate.df=spark.createDataFrame([{"title":"valid_1","release_date":"2020-1-10","garbage":"wdacfa"},{"title":"valid_2","release_date":"2020-1-11","garbage":"5wacfa"},{"title":"invalid_1","release_date":"2020-31-11","garbage":"3aqf"},{"title":"invalid_2","release_date":"2020-1-51","garbage":"vda"},])valid_df,errors_df=schema.validate_df(df)# Output of valid data frame. Contains rows with errors as# the option 'split_errors' was set to False.valid_df.show()# +-------+------------+--------------------+# | title|release_date| _errors|# +-------+------------+--------------------+# |valid_1| 2020-01-10| |# |valid_2| 2020-01-11| |# | | |{"row": {"release...|# | | |{"row": {"release...|# +-------+------------+--------------------+# The errors data frame will be set to Noneasserterrors_dfisNone# True
最后,除了在模式中传递特定于棉花糖的选项之外,还可以在validate_df
方法中传递它们。
这些选项被传递给棉花糖的load
方法:
schema=AlbumSchema(error_column_name="custom_errors",# Use 'custom_errors' as name for errors columnsplit_errors=False,# Don't split the input data frame into valid and errors)valid_df,errors_df=schema.validate_df(df,unkown=EXCLUDE)
重复
marshmallowpyspark提供了验证一个或多个模式字段是否存在重复值的功能。这就实现了
通过将字段名添加到架构的UNIQUE
属性,如下所示:
classAlbumSchema(Schema):# Unique valued field "title" in the schemaUNIQUE=["title"]title=fields.Str()release_date=fields.Date()# Input data frame to validate.df=spark.createDataFrame([{"title":"title_1","release_date":"2020-1-10"},{"title":"title_2","release_date":"2020-1-11"},{"title":"title_2","release_date":"2020-3-11"},# duplicate title{"title":"title_3","release_date":"2020-1-51"},])# Validate data framevalid_df,errors_df=AlbumSchema().validate_df(df)# List of valid rowsvalid_rows=[row.asDict(recursive=True)forrowinvalid_df.collect()]## [# {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},# {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)}# ]## Rows with errorserror_rows=[row.asDict(recursive=True)forrowinerrors_df.collect()]# # [# {'_errors': '{"row": {"release_date": "2020-3-11", "title": "title_2", "__count__title": 2}, '# '"errors": ["duplicate row"]}'},# {'_errors': '{"row": {"release_date": "2020-1-51", "title": "title_3", "__count__title": 1}, '# '"errors": {"release_date": ["Not a valid date."]}}'}# ]#
在link中讨论了删除重复但先保留的技术。
如果模式中有多个唯一字段,只需将它们添加到UNIQUE
,例如UNIQUE=["title", "release_date"]
。
您甚至可以通过在列表中分组来指定字段组合的唯一性:
classAlbumSchema(Schema):# Combined values of "title" and "release_date" should be uniqueUNIQUE=[["title","release_date"]]title=fields.Str()release_date=fields.Date()# Input data frame to validate.df=spark.createDataFrame([{"title":"title_1","release_date":"2020-1-10"},{"title":"title_2","release_date":"2020-1-11"},{"title":"title_2","release_date":"2020-3-11"},{"title":"title_3","release_date":"2020-1-21"},{"title":"title_3","release_date":"2020-1-21"},{"title":"title_4","release_date":"2020-1-51"},])# Validate data framevalid_df,errors_df=AlbumSchema().validate_df(df)# List of valid rowsvalid_rows=[row.asDict(recursive=True)forrowinvalid_df.collect()]## [# {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},# {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)},# {'title': 'title_3', 'release_date': datetime.date(2020, 1, 21)}# ]## Rows with errorserror_rows=[row.asDict(recursive=True)forrowinerrors_df.collect()]# # [# {'_errors': '{"row": {"release_date": "2020-1-21", "title": "title_3", '# '"__count__title": 2, "__count__release_date": 2}, '# '"errors": ["duplicate row"]}'},# {'_errors': '{"row": {"release_date": "2020-1-51", "title": "title_4", '# '"__count__title": 1, "__count__release_date": 1}, '# '"errors": {"release_date": ["Not a valid date."]}}'},# {'_errors': '{"row": {"release_date": "2020-3-11", "title": "title_2", '# '"__count__title": 2, "__count__release_date": 1}, '# '"errors": ["duplicate row"]}'}# ]#
WARNING:重复检查需要对每个唯一字段进行数据洗牌。拥有大量的唯一字段将产生影响
激发工作绩效。默认情况下,UNIQUE
被设置为一个空列表,以防止任何重复检查。在
字段
棉花糖有各种不同的字段,可以用来定义模式。内部棉花糖Pypark 将这些字段转换为pyspark SQL数据类型。下表列出了支持的棉花糖字段及其 等效spark SQL数据类型:
Marshmallow | PySpark |
---|---|
^{ | user specified |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
^{ | ^{ |
默认情况下,StringType
数据类型用于不在上表中的棉花糖字段。spark_schema
属性
可用于检查转换后的spark SQL架构:
# Gets the spark schema for the Album schemaAlbumSchema().spark_schema# StructType(List(StructField(title,StringType,true),StructField(release_date,DateType,true),StructField(_errors,StringType,true)))
自定义字段
棉花糖pyspark附带了一个额外的Raw
字段的支持。Raw
字段不执行任何格式设置
并要求用户指定与字段关联的spark数据类型。请参见以下示例:
frommarshmallow_pysparkimportSchemafrommarshmallow_pyspark.fieldsimportRawfrommarshmallowimportfieldsfrompyspark.sql.typesimportDateTypefromdatetimeimportdateclassAlbumSchema(Schema):title=fields.Str()# Takes python datetime.date objects and treats them as pyspark DateTyperelease_date=Raw(spark_type=DateType())# Input data frame to validate.df=spark.createDataFrame([{"title":"title_1","release_date":date(2020,1,10)},{"title":"title_2","release_date":date(2020,1,11)},{"title":"title_3","release_date":date(2020,3,10)},])# Validate data framevalid_df,errors_df=AlbumSchema().validate_df(df)# List of valid rowsvalid_rows=[row.asDict(recursive=True)forrowinvalid_df.collect()]## [# {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},# {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)},# {'title': 'title_3', 'release_date': datetime.date(2020, 3, 10)}# ]## Rows with errorserror_rows=[row.asDict(recursive=True)forrowinerrors_df.collect()]# # []#
还可以添加对自定义棉花糖字段或上表中缺少的字段的支持。为了这样做,
您需要为自定义字段创建一个转换器。可以使用ConverterABC
接口构建转换器:
frommarshmallow_pysparkimportConverterABCfrompyspark.sql.typesimportStringTypeclassEmailConverter(ConverterABC):""" Converter to convert marshmallow's Email field to a pyspark SQL data type. """defconvert(self,ma_field):returnStringType()
提供convert
方法中的ma_field
参数来处理嵌套字段。例如,您可以签出
^{cd44}。现在,最后一步是将转换器添加到模式的CONVERTER_MAP
属性中:
frommarshmallow_pysparkimportSchemafrommarshmallowimportfieldsclassUser(Schema):name=fields.String(required=True)email=fields.Email(required=True)# Adding email converter to schema.User.CONVERTER_MAP[fields.Email]=EmailConverter# You can now use your schema to validate the input data frame.valid_df,errors_df=User().validate_df(input_df)
发展
要在本地破解棉花糖pyspark:
$ pip install -e .[dev]# to install all dependencies $ pytest --cov-config .coveragerc --cov=./ # to get coverage report $ pylint marshmallow_pyspark # to check code quality with PyLint
您可以选择使用make
来执行开发任务。在
许可证
源代码是在Apache许可证版本2下授权的。在
一氧化碳贡献
拉请求永远欢迎!:)
- 项目
标签: