通过Spark connector在CosmosDB图形中附加多个属性值

2024-10-03 02:34:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我们希望使用其Spark连接器将新值附加到CosmosDB图的顶点的属性中

假设我们有一个id为'a'的顶点,其属性name的值为Andrea。如果我们再次遇到id为'a'且属性name(比如Alice)的值不同的顶点,我们希望将Alice附加到顶点属性中。
因此,顶点将具有一个属性name,该属性具有两个不同的值:AndreaAlice

使用Gremlin,可以使用.property(list, 'name', 'Alice')来完成,但是我们没有找到使用Spark连接器的方法。我们看到了docs,我们认为可以使用spark.cosmos.write.strategy = ItemAppend来实现,但实际上顶点属性没有更新

Info:对于Spark连接器,我们使用了这个link的代码


示例代码

from pyspark.sql.functions import *

# starting graph
v = sqlContext.createDataFrame([
  ("a", "Andrea", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))



def to_cosmosdb_vertices(dfVertices, labelColumn, partitionKey = ""):
  
  columns = ["id", labelColumn]
  
  if partitionKey:
    columns.append(partitionKey)
  
  columns.extend(['nvl2({x}, array(named_struct("id", uuid(), "_value", {x})), NULL) AS {x}'.format(x=x) \
                for x in dfVertices.columns if x not in columns])
 
  return dfVertices.selectExpr(*columns).withColumnRenamed(labelColumn, "label")

cosmosDbVertices = to_cosmosdb_vertices(v, "entity")



cosmosDbConfig = {
  "spark.cosmos.accountEndpoint" : "https://<COSMOSDB_ENDPOINT>.documents.azure.com:443/",
  "spark.cosmos.accountKey" : "<COSMOSDB_PRIMARYKEY>",
  "spark.cosmos.database" : "<DATABASE>",
  "spark.cosmos.container" : "<COLLECTION>",
  "spark.cosmos.write.strategy" : "ItemAppend"   # we tried ItemAppend here
}

cosmosDbFormat = "cosmos.oltp"

cosmosDbVertices.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()



# now we try to insert a new value for property 'name' of id 'a'
v = sqlContext.createDataFrame([
  ("a", "Alice", None)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))

cosmosDbVertices = to_cosmosdb_vertices(v, "entity")

cosmosDbVertices.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()

Tags: columnstonameid属性sparkwriteentity