我处理的是大数据集,其中我的记录如下表所示
uniqueId col1 col2 col3 Levels
1 A1 A2 A3 {"2019-01-01":1.1 ,"2019-01-02":2.1 ,"2019-01-03":3.1}
2 B1 B2 B3 {"2019-01-01":1.2 ,"2019-01-03":3.2}
3 C1 C2 C3 {"2019-01-04":4.3}
“Levels”存储为字符串类型。在
我正在尝试将Levels
拆分成行,以便获得如下输出:
我尝试在Pyspark的AWS Glue上应用一个脚本,遵循这里提出的解决方案
PySpark "explode" dict in column
@udf("map<string, string>")
def parse(s):
try:
return json.loads(s)
except json.JSONDecodeError:
pass
parse_udf = udf(parse)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "table", transformation_ctx = "datasource0")
sparkDF = datasource0.toDF()
sparkDF2 = sparkDF.select("unique_id","col1","col2", "col3", explode(parse("levels")).alias("date", "value"))
GlueDF_tmp = DynamicFrame.fromDF(sparkDF2, glueContext, 'GlueDF_tmp')
GlueDF = GlueDF_tmp.apply_mapping([("unique_id", "string", "unique_id", "string"),
("col1", "string", "col1", "string"),
("col2", "string", "col2", "string"),
("col3", "string", "col3", "string"),
("date", "timestamp", "date", "timestamp"),
("value", "double", "value", "double")])
glueContext.write_dynamic_frame.from_options(frame = GlueDF, connection_type = "s3",
connection_options = {"path": "s3://..."},
format = "parquet",
transformation_ctx = "datasink0")
但是我遇到了这种记忆问题 AWS Glue - can't set spark.yarn.executor.memoryOverhead
什么是更好/更有效的分割方法?
目前没有回答
相关问题 更多 >
编程相关推荐