PySpark数据序列化程序

marshmallow-pyspark的Python项目详细描述


棉花糖

Build Statuscodecov.ioApache 2.0 licensed

Marshmallow是用于数据序列化和验证的流行包。 一种方法是在棉花糖中定义数据模式,其中包含关于如何封送输入数据的规则。类似于棉花糖, pyspark还提供了自己的模式定义,用于 处理数据帧。此包使用户能够利用棉花糖模式及其强大的数据验证功能 在pyspark应用程序中。在数据一致性和质量较高的数据管道ETL作业中,可以使用这些功能 很重要。在

安装

可以使用pip安装包:

$ pip install marshmallow-pyspark

使用

数据模式可以像使用棉花糖一样定义。下面是一个简单的例子:

^{pr2}$

更多选项

除了支持棉花糖的选项之外,Schema类还附带了两个附加的初始化参数:

  • error_column_name:要存储验证错误的列的名称。默认值是_errors。在

  • split_errors:将包含验证错误的行作为独立的数据帧与有效行分开。当设置为False时 包含错误的行将与有效行一起作为单个数据帧返回。所有错误行的字段值为 设置为null。为了方便用户,可以在错误JSON的row属性中找到原始字段值。 默认值是True。在

示例如下:

frommarshmallowimportEXCLUDEschema=AlbumSchema(error_column_name="custom_errors",# Use 'custom_errors' as name for errors columnsplit_errors=False,# Don't split the input data frame into valid and errorsunkown=EXCLUDE# Marshmallow option to exclude fields not present in schema)# Input data frame to validate.df=spark.createDataFrame([{"title":"valid_1","release_date":"2020-1-10","garbage":"wdacfa"},{"title":"valid_2","release_date":"2020-1-11","garbage":"5wacfa"},{"title":"invalid_1","release_date":"2020-31-11","garbage":"3aqf"},{"title":"invalid_2","release_date":"2020-1-51","garbage":"vda"},])valid_df,errors_df=schema.validate_df(df)# Output of valid data frame. Contains rows with errors as# the option 'split_errors' was set to False.valid_df.show()#    +-------+------------+--------------------+#    |  title|release_date|             _errors|#    +-------+------------+--------------------+#    |valid_1|  2020-01-10|                    |#    |valid_2|  2020-01-11|                    |#    |       |            |{"row": {"release...|#    |       |            |{"row": {"release...|#    +-------+------------+--------------------+# The errors data frame will be set to Noneasserterrors_dfisNone# True

最后,除了在模式中传递特定于棉花糖的选项之外,还可以在validate_df方法中传递它们。 这些选项被传递给棉花糖的load方法:

schema=AlbumSchema(error_column_name="custom_errors",# Use 'custom_errors' as name for errors columnsplit_errors=False,# Don't split the input data frame into valid and errors)valid_df,errors_df=schema.validate_df(df,unkown=EXCLUDE)

重复

marshmallowpyspark提供了验证一个或多个模式字段是否存在重复值的功能。这就实现了 通过将字段名添加到架构的UNIQUE属性,如下所示:

classAlbumSchema(Schema):# Unique valued field "title" in the schemaUNIQUE=["title"]title=fields.Str()release_date=fields.Date()# Input data frame to validate.df=spark.createDataFrame([{"title":"title_1","release_date":"2020-1-10"},{"title":"title_2","release_date":"2020-1-11"},{"title":"title_2","release_date":"2020-3-11"},# duplicate title{"title":"title_3","release_date":"2020-1-51"},])# Validate data framevalid_df,errors_df=AlbumSchema().validate_df(df)# List of valid rowsvalid_rows=[row.asDict(recursive=True)forrowinvalid_df.collect()]##   [#        {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},#        {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)}#   ]## Rows with errorserror_rows=[row.asDict(recursive=True)forrowinerrors_df.collect()]# #   [#        {'_errors': '{"row": {"release_date": "2020-3-11", "title": "title_2", "__count__title": 2}, '#                    '"errors": ["duplicate row"]}'},#        {'_errors': '{"row": {"release_date": "2020-1-51", "title": "title_3", "__count__title": 1}, '#                    '"errors": {"release_date": ["Not a valid date."]}}'}#    ]#

link中讨论了删除重复但先保留的技术。 如果模式中有多个唯一字段,只需将它们添加到UNIQUE,例如UNIQUE=["title", "release_date"]。 您甚至可以通过在列表中分组来指定字段组合的唯一性:

classAlbumSchema(Schema):# Combined values of "title" and "release_date" should be uniqueUNIQUE=[["title","release_date"]]title=fields.Str()release_date=fields.Date()# Input data frame to validate.df=spark.createDataFrame([{"title":"title_1","release_date":"2020-1-10"},{"title":"title_2","release_date":"2020-1-11"},{"title":"title_2","release_date":"2020-3-11"},{"title":"title_3","release_date":"2020-1-21"},{"title":"title_3","release_date":"2020-1-21"},{"title":"title_4","release_date":"2020-1-51"},])# Validate data framevalid_df,errors_df=AlbumSchema().validate_df(df)# List of valid rowsvalid_rows=[row.asDict(recursive=True)forrowinvalid_df.collect()]##   [#        {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},#        {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)},#        {'title': 'title_3', 'release_date': datetime.date(2020, 1, 21)}#   ]## Rows with errorserror_rows=[row.asDict(recursive=True)forrowinerrors_df.collect()]# #   [#        {'_errors': '{"row": {"release_date": "2020-1-21", "title": "title_3", '#                    '"__count__title": 2, "__count__release_date": 2}, '#                    '"errors": ["duplicate row"]}'},#        {'_errors': '{"row": {"release_date": "2020-1-51", "title": "title_4", '#                    '"__count__title": 1, "__count__release_date": 1}, '#                    '"errors": {"release_date": ["Not a valid date."]}}'},#        {'_errors': '{"row": {"release_date": "2020-3-11", "title": "title_2", '#                    '"__count__title": 2, "__count__release_date": 1}, '#                    '"errors": ["duplicate row"]}'}#    ]#

WARNING:重复检查需要对每个唯一字段进行数据洗牌。拥有大量的唯一字段将产生影响 激发工作绩效。默认情况下,UNIQUE被设置为一个空列表,以防止任何重复检查。在

字段

棉花糖有各种不同的字段,可以用来定义模式。内部棉花糖Pypark 将这些字段转换为pyspark SQL数据类型。下表列出了支持的棉花糖字段及其 等效spark SQL数据类型:

MarshmallowPySpark
^{}user specified
^{}^{}
^{}^{}
^{}^{}
^{}^{}
^{}^{}
^{}^{}
^{}^{}
^{}^{}
^{}^{}
^{}^{}

默认情况下,StringType数据类型用于不在上表中的棉花糖字段。spark_schema属性 可用于检查转换后的spark SQL架构:

# Gets the spark schema for the Album schemaAlbumSchema().spark_schema# StructType(List(StructField(title,StringType,true),StructField(release_date,DateType,true),StructField(_errors,StringType,true)))

自定义字段

棉花糖pyspark附带了一个额外的Raw字段的支持。Raw字段不执行任何格式设置 并要求用户指定与字段关联的spark数据类型。请参见以下示例:

frommarshmallow_pysparkimportSchemafrommarshmallow_pyspark.fieldsimportRawfrommarshmallowimportfieldsfrompyspark.sql.typesimportDateTypefromdatetimeimportdateclassAlbumSchema(Schema):title=fields.Str()# Takes python datetime.date objects and treats them as pyspark DateTyperelease_date=Raw(spark_type=DateType())# Input data frame to validate.df=spark.createDataFrame([{"title":"title_1","release_date":date(2020,1,10)},{"title":"title_2","release_date":date(2020,1,11)},{"title":"title_3","release_date":date(2020,3,10)},])# Validate data framevalid_df,errors_df=AlbumSchema().validate_df(df)# List of valid rowsvalid_rows=[row.asDict(recursive=True)forrowinvalid_df.collect()]##   [#        {'title': 'title_1', 'release_date': datetime.date(2020, 1, 10)},#        {'title': 'title_2', 'release_date': datetime.date(2020, 1, 11)},#        {'title': 'title_3', 'release_date': datetime.date(2020, 3, 10)}#   ]## Rows with errorserror_rows=[row.asDict(recursive=True)forrowinerrors_df.collect()]# #   []#

还可以添加对自定义棉花糖字段或上表中缺少的字段的支持。为了这样做, 您需要为自定义字段创建一个转换器。可以使用ConverterABC接口构建转换器:

frommarshmallow_pysparkimportConverterABCfrompyspark.sql.typesimportStringTypeclassEmailConverter(ConverterABC):"""        Converter to convert marshmallow's Email field to a pyspark         SQL data type.    """defconvert(self,ma_field):returnStringType()

提供convert方法中的ma_field参数来处理嵌套字段。例如,您可以签出 ^{cd44}。现在,最后一步是将转换器添加到模式的CONVERTER_MAP属性中:

frommarshmallow_pysparkimportSchemafrommarshmallowimportfieldsclassUser(Schema):name=fields.String(required=True)email=fields.Email(required=True)# Adding email converter to schema.User.CONVERTER_MAP[fields.Email]=EmailConverter# You can now use your schema to validate the input data frame.valid_df,errors_df=User().validate_df(input_df)

发展

要在本地破解棉花糖pyspark:

$ pip install -e .[dev]# to install all dependencies
$ pytest --cov-config .coveragerc --cov=./			# to get coverage report
$ pylint marshmallow_pyspark			# to check code quality with PyLint

您可以选择使用make来执行开发任务。在

许可证

源代码是在Apache许可证版本2下授权的。在

一氧化碳贡献

拉请求永远欢迎!:)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java使用Jackson解析非同构JSON对象数组   java为什么'Stream<T>::<A>toArray(IntFunction<A[]>)`接受没有绑定的类型参数A?   java在JavaFX2.0中获取给定布局中的节点大小?   java双链接列表创建节点   java使用HashMap添加、删除和查找   java中push_back(C++)的等效方法是什么?   java在Jetty中运行servlet时获得HTTP 500   用java显示包含图像和文本的页面的最简单文档格式   swing从选项卡窗格Java中的不同选项卡访问数据   字符串Java帮助检查登录类使用。CSV文件   java Struts 1.2.9动作链接   包含max元素的java列表   currentNode上的jcr Java空检查   在Android中使用OpenNLP的POSTaggerMe时出现java NullPointerException   java Logback只将消息记录到syslog一次   如何用Java编写构造函数的API文档   java从gallery中获取所有图像并存储在阵列中   java Maven:将外部jar文件夹添加到类路径