火花自动制造机

sparkautomapper的Python项目详细描述


斯巴卡自动映射器

Fluent API在Spark中将数据从一个视图映射到另一个视图。在

在底层使用本机Spark函数,因此它与手写转换一样快。在

因为这只是Python,所以可以使用任何Python编辑器。由于所有内容都是使用Python类型输入的,所以大多数编辑器都会自动完成并在您出错时警告您

使用

pip install sparkautomapper

SparkAutoMapper输入和输出

可以将数据帧传递给SparkAutoMapper,也可以指定要从中读取的Spark视图的名称。在

可以将结果作为数据帧接收,也可以(可选)传入需要结果的视图的名称。在

动态类型示例

将destination中的列设置为文本值(从传入数据帧中读取并将结果返回到新的数据帧中)

将目标中的列设置为文本值

^{pr2}$

将destination中的列设置为文本值(从Spark视图读取并将结果放入另一个Spark视图)

将目标中的列设置为文本值

fromspark_auto_mapper.automappers.automapperimportAutoMappermapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1="hello")

将destination中的列设置为int值

将目标中的列设置为文本值

fromspark_auto_mapper.automappers.automapperimportAutoMappermapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1=1050)

将列(src1)从源视图复制到目标视图列(dst1)

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1=A.column("src1"))

也可以使用快捷方式指定列(在[]中换行列名)

fromspark_auto_mapper.automappers.automapperimportAutoMappermapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1="[src1]")

转换列(或字符串文本)的数据类型

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(birthDate=A.date(A.column("date_of_birth")))

使用Spark SQL表达式(可以使用任何有效的Spark SQL表达式)

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(gender=A.expression("""    CASE        WHEN `Member Sex` = 'F' THEN 'female'        WHEN `Member Sex` = 'M' THEN 'male'        ELSE 'other'    END    """))

指定多个转换

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1="[src1]",birthDate=A.date("[date_of_birth]"),gender=A.expression("""    CASE        WHEN `Member Sex` = 'F' THEN 'female'        WHEN `Member Sex` = 'M' THEN 'male'        ELSE 'other'    END    """))

使用变量或参数

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAdefmapping(parameters:dict):mapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1=A.column(parameters["my_column_name"]))

使用条件逻辑
fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAdefmapping(parameters:dict):mapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst1=A.column(parameters["my_column_name"]))ifparameters["customer"]=="Microsoft":mapper=mapper.columns(important_customer=1,customer_name=parameters["customer"])returnmapper

使用嵌套数组列

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).withColumn(dst2=A.list(["address1","address2"]))

使用嵌套结构列

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst2=A.complex(use="usual",family="imran"))

使用结构列表

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapper(view="members",source_view="patients",keys=["member_id"]).columns(dst2=A.list([A.complex(use="usual",family="imran"),A.complex(use="usual",family="[last_name]")]))

执行AutoMapper

spark.createDataFrame([(1,'Qureshi','Imran'),(2,'Vidal','Michael'),],['member_id','last_name','first_name']).createOrReplaceTempView("patients")source_df:DataFrame=spark.table("patients")df=source_df.select("member_id")df.createOrReplaceTempView("members")result_df:DataFrame=mapper.transform(df=df)

静态类型示例

要进一步改进自动完成和语法检查,可以定义复杂类型:

定义自定义数据类型:

fromspark_auto_mapper.type_definitions.automapper_defined_typesimportAutoMapperTextInputTypefromspark_auto_mapper.helpers.automapper_value_parserimportAutoMapperValueParserfromspark_auto_mapper.data_types.dateimportAutoMapperDateDataTypefromspark_auto_mapper.data_types.listimportAutoMapperListfromspark_auto_mapper_fhir.fhir_types.automapper_fhir_data_type_complex_baseimportAutoMapperFhirDataTypeComplexBaseclassAutoMapperFhirDataTypePatient(AutoMapperFhirDataTypeComplexBase):# noinspection PyPep8Namingdef__init__(self,id_:AutoMapperTextInputType,birthDate:AutoMapperDateDataType,name:AutoMapperList,gender:AutoMapperTextInputType)->None:super().__init__()self.value=dict(id=AutoMapperValueParser.parse_value(id_),birthDate=AutoMapperValueParser.parse_value(birthDate),name=AutoMapperValueParser.parse_value(name),gender=AutoMapperValueParser.parse_value(gender))

现在您可以自动完成和语法检查:

fromspark_auto_mapper.automappers.automapperimportAutoMapperfromspark_auto_mapper.helpers.automapper_helpersimportAutoMapperHelpersasAmapper=AutoMapperFhir(view="members",source_view="patients",keys=["member_id"]).withResource(resource=F.patient(id_=A.column("a.member_id"),birthDate=A.date(A.column("date_of_birth")),name=A.list(F.human_name(use="usual",family=A.column("last_name"))),gender="female"))

发布新包

  1. 编辑版本增量
  2. 创建新版本
  3. GitHub操作应该自动启动并发布包
  4. 您可以在“操作”选项卡中查看状态

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在某些练习中避免索引异常   java Android,如何在具有socket的网络区域上提高性能?   更改web中的执行顺序后,JavaServlet过滤器不起作用。xml   java如何绑定泛型类?   JavaGmail RESTAPI:使用Google凭证而不模拟   java是解码整数序列的最快方法   java根据hashmaps的值(通过map的值进行比较)对hashmaps的数组列表进行排序   用于JBoss 7.1或Apache的java负载平衡器,带有Healt检查   java非常慢的MySQL读取性能   java如何在使用iRetryAnalyzer时从Windows CMD关闭Selenium WebDriver?   java随机闪烁仅出现在Galaxy Note 4上   java AttributeOverride MappedSuperClass属性的类型不同   java JPA:如何检测现有实体是否已更新?   java如何使用mavenassemblyplugin从dependencySet中删除METAINF?   安装SecurityManager时,java MQQueueManager构造函数挂起