使用pyspark将作业粘附到联合数据帧?

2024-09-30 18:27:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我基本上是在尝试从一个DF向另一个DF更新/添加行。 这是我的密码:

# S3
import boto3

# SOURCE
source_table = "someDynamoDbtable"
source_s3 = "s://mybucket/folder/"

# DESTINATION
destination_bucket = "s3://destination-bucket"

#Select which attributes to update/add
params = ['attributeD', 'attributeF', 'AttributeG']

#spark wrapper
glueContext = GlueContext(SparkContext.getOrCreate())

newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()

oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()

#makes a union of the dataframes
rebuildData = oldDF.union(newDF)
#error happens here
readyData = DynamicFrame.fromDF(rebuildData, glueContext, "readyData")

#writes new data to s3 destination, into orc files, while partitioning
glueContext.write_dynamic_frame.from_options(frame = readyData, connection_type = "s3", connection_options = {"path": destination_bucket}, format = "orc", partitionBy=['partition_year', 'partition_month', 'partition_day'])

我得到的错误是SyntaxError:invalid syntaxon linereadyData=。。。 到目前为止,我还不知道怎么了,我的头从所有的空白代码开始痛。 如果你见过类似的东西,请给我一根骨头。你知道吗


Tags: fromformatsources3buckettypedynamicparams
2条回答

您正在执行dataframe和dynamicframe之间的并集操作。你知道吗

这将创建名为newData的动态帧和名为newDF的数据帧:

newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()

这将创建名为oldData的动态帧和名为oldDF的数据帧:

oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()

您正在对上述两个实体执行联合操作,如下所示:

rebuildData = oldDF.union(newData)

应该是:

rebuildData = oldDF.union(newDF)

是的,所以我想我需要做的是使用外部连接。 让我解释一下:

  • 我加载两个数据帧,其中一个删除我们要更新的字段。你知道吗
  • 第二个只选择这些字段,因此这两个字段都不会有重复的行/列。你知道吗
  • 我们使用外部(或完全)连接,而不是仅仅添加行的联合。这将添加我的数据帧中的所有数据,而不重复。你知道吗

现在,我的逻辑可能有缺陷,但到目前为止,它是工作的好我。如果有人正在寻找类似的解决方案,欢迎您使用。 我的代码已更改:

rebuildData = oldDF.join(newData, 'id', 'outer')

相关问题 更多 >