我们希望使用其Spark连接器将新值附加到CosmosDB图的顶点的属性中
假设我们有一个id为'a'
的顶点,其属性name
的值为Andrea
。如果我们再次遇到id为'a'
且属性name
(比如Alice
)的值不同的顶点,我们希望将Alice
附加到顶点属性中。
因此,顶点将具有一个属性name
,该属性具有两个不同的值:Andrea
和Alice
使用Gremlin,可以使用.property(list, 'name', 'Alice')
来完成,但是我们没有找到使用Spark连接器的方法。我们看到了docs,我们认为可以使用spark.cosmos.write.strategy = ItemAppend
来实现,但实际上顶点属性没有更新
Info:对于Spark连接器,我们使用了这个link的代码
from pyspark.sql.functions import *
# starting graph
v = sqlContext.createDataFrame([
("a", "Andrea", 34),
("b", "Bob", 36),
("c", "Charlie", 30)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))
def to_cosmosdb_vertices(dfVertices, labelColumn, partitionKey = ""):
columns = ["id", labelColumn]
if partitionKey:
columns.append(partitionKey)
columns.extend(['nvl2({x}, array(named_struct("id", uuid(), "_value", {x})), NULL) AS {x}'.format(x=x) \
for x in dfVertices.columns if x not in columns])
return dfVertices.selectExpr(*columns).withColumnRenamed(labelColumn, "label")
cosmosDbVertices = to_cosmosdb_vertices(v, "entity")
cosmosDbConfig = {
"spark.cosmos.accountEndpoint" : "https://<COSMOSDB_ENDPOINT>.documents.azure.com:443/",
"spark.cosmos.accountKey" : "<COSMOSDB_PRIMARYKEY>",
"spark.cosmos.database" : "<DATABASE>",
"spark.cosmos.container" : "<COLLECTION>",
"spark.cosmos.write.strategy" : "ItemAppend" # we tried ItemAppend here
}
cosmosDbFormat = "cosmos.oltp"
cosmosDbVertices.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()
# now we try to insert a new value for property 'name' of id 'a'
v = sqlContext.createDataFrame([
("a", "Alice", None)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))
cosmosDbVertices = to_cosmosdb_vertices(v, "entity")
cosmosDbVertices.write.format(cosmosDbFormat).mode("append").options(**cosmosDbConfig).save()
目前没有回答
相关问题 更多 >
编程相关推荐