AWS Glue PySpark:将表示为字符串的字典拆分为多行

2024-10-02 14:24:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我处理的是大数据集,其中我的记录如下表所示

uniqueId col1 col2 col3  Levels
1    A1   A2   A3    {"2019-01-01":1.1 ,"2019-01-02":2.1 ,"2019-01-03":3.1}
2    B1   B2   B3    {"2019-01-01":1.2 ,"2019-01-03":3.2}
3    C1   C2   C3    {"2019-01-04":4.3}

“Levels”存储为字符串类型。在

我正在尝试将Levels拆分成行,以便获得如下输出:

^{pr2}$

我尝试在Pyspark的AWS Glue上应用一个脚本,遵循这里提出的解决方案

PySpark "explode" dict in column

@udf("map<string, string>")
def parse(s):
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        pass 

parse_udf = udf(parse) 



datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "table", transformation_ctx = "datasource0")

sparkDF = datasource0.toDF() 

sparkDF2 = sparkDF.select("unique_id","col1","col2", "col3", explode(parse("levels")).alias("date", "value"))



GlueDF_tmp = DynamicFrame.fromDF(sparkDF2, glueContext, 'GlueDF_tmp')

GlueDF = GlueDF_tmp.apply_mapping([("unique_id", "string", "unique_id", "string"),
        ("col1", "string", "col1", "string"),
        ("col2", "string", "col2", "string"),
        ("col3", "string", "col3", "string"),
        ("date", "timestamp", "date", "timestamp"),
        ("value", "double", "value", "double")])


glueContext.write_dynamic_frame.from_options(frame = GlueDF, connection_type = "s3", 
     connection_options = {"path": "s3://..."}, 
     format = "parquet", 
     transformation_ctx = "datasink0")

但是我遇到了这种记忆问题 AWS Glue - can't set spark.yarn.executor.memoryOverhead

什么是更好/更有效的分割方法?


Tags: iddatestringparsevalueframecol2col3