我学习了如何使用spark cosmosdb connector来创建DataFrame
,现在我想对DataFrame做点什么。在我操作小数据集合之前一切都很好(或者在read配置中添加额外的custom_query
以缩小数据范围)。在
例如,我可以创建一个DF,然后执行df.show()
,或者在其上创建一个临时视图,然后执行%%sql select * from c
。但是当我尝试做df.count()
或%%sql select * from c order by name desc
时,我收到一个错误消息:Request rate is large
(Full stacktrace on pastebin)。我考虑将表具体化为hive(df.write.saveAsTable(tableName)
),我得到了同样的错误。在
有没有一种方法可以在调用如此重的函数时减少对数据库的请求量?或者是另一种在配置单元中具体化数据的方法,这样我就可以在以后处理它,而不需要一次又一次地通过连接器?这个限制看起来真的很麻烦,所以我无法处理这些数据。在
该集合的roughput为400ru/s,HdInsinght的参数为spark2.2onlinux(hdi3.6),Scala:2.11.8。我正在使用带有pyspark3内核的JupyterNotebook。以下是我使用的全部代码:
%%configure -f
{ "name":"Spark-to-Cosmos_DB_Connector",
"jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
"conf": {
"spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.1.0",
"spark.jars.excludes": "org.scala-lang:scala-reflect"
}
}
iotConfig = {
"Endpoint" : "https://myDB.documents.azure.com:443/",
"Masterkey" : "myKey==",
"Database" : "test",
"preferredRegions" : "West Europe",
"Collection" : "surrogate",
"SamplingRation" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647"
}
df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**iotConfig).load()
df.createOrReplaceTempView("c")
#will work
df.show()
#wont work
%%sql
select count(*) from c
#wont work
%%sql
select * from c order by name desc
#wont work
df.count()
#wont work
df.write.saveAsTable('table')
如有任何建议,我们将不胜感激。 提前谢谢。在
也许你应该试着增加你的收藏的吞吐量。这正是
Request rate is large
错误所指示的请注意,在400 RU时,使用
azure-cosmosdb-spark
时,Spark将过快地从集合中请求太多数据。一种潜在的方法是使用pydocumentdb
,其中请求不是来自Spark执行器(比如使用azure-cosmosdb-spark
)而是来自驱动程序。这可能会降低请求速率。在使用
azure-cosmosdb-spark
时,可以通过减小query_pagesize
的大小来降低请求速率。这是configuration reference guide当前正在进行的工作。在我们遇到了一个类似的问题,我们通过
azure-cosmosdb-spark
连接器从cosmosdb
集合(1000个R/U,~25GB,~1500万个文档)中提取数据。在对于平滑的批量拉取,在配置中使用
query_pagesize = <XXXX>
,其中XXXX应该相对较小(比如~100K)。这将确保收集的负载有限,从而平稳响应。在为了在
DataFrame
中进行查询并获取计数,我们使用了以下代码。(斯卡拉)相关问题 更多 >
编程相关推荐