在Sp中处理CosmosDB的大数据集

2024-09-27 19:25:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我学习了如何使用spark cosmosdb connector来创建DataFrame,现在我想对DataFrame做点什么。在我操作小数据集合之前一切都很好(或者在read配置中添加额外的custom_query以缩小数据范围)。在

例如,我可以创建一个DF,然后执行df.show(),或者在其上创建一个临时视图,然后执行%%sql select * from c。但是当我尝试做df.count()%%sql select * from c order by name desc时,我收到一个错误消息:Request rate is largeFull stacktrace on pastebin)。我考虑将表具体化为hive(df.write.saveAsTable(tableName)),我得到了同样的错误。在

有没有一种方法可以在调用如此重的函数时减少对数据库的请求量?或者是另一种在配置单元中具体化数据的方法,这样我就可以在以后处理它,而不需要一次又一次地通过连接器?这个限制看起来真的很麻烦,所以我无法处理这些数据。在

该集合的roughput为400ru/s,HdInsinght的参数为spark2.2onlinux(hdi3.6),Scala:2.11.8。我正在使用带有pyspark3内核的JupyterNotebook。以下是我使用的全部代码:

%%configure -f
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.2.0_2.11-1.1.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": {
        "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.1.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

iotConfig = {
    "Endpoint" : "https://myDB.documents.azure.com:443/",
    "Masterkey" : "myKey==",
    "Database" : "test",
    "preferredRegions" : "West Europe",
    "Collection" : "surrogate",
    "SamplingRation" : "1.0",
    "schema_samplesize" : "1000",
    "query_pagesize" : "2147483647"
}

df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**iotConfig).load()

df.createOrReplaceTempView("c")

#will work
df.show()

#wont work
%%sql
select count(*) from c

#wont work
%%sql
select * from c order by name desc

#wont work    
df.count()

#wont work    
df.write.saveAsTable('table')

如有任何建议,我们将不胜感激。 提前谢谢。在


Tags: 数据fromdfsqlexamplecountazureselect
3条回答

也许你应该试着增加你的收藏的吞吐量。这正是Request rate is large错误所指示的

请注意,在400 RU时,使用azure-cosmosdb-spark时,Spark将过快地从集合中请求太多数据。一种潜在的方法是使用pydocumentdb,其中请求不是来自Spark执行器(比如使用azure-cosmosdb-spark)而是来自驱动程序。这可能会降低请求速率。在

使用azure-cosmosdb-spark时,可以通过减小query_pagesize的大小来降低请求速率。这是configuration reference guide当前正在进行的工作。在

我们遇到了一个类似的问题,我们通过azure-cosmosdb-spark连接器从cosmosdb集合(1000个R/U,~25GB,~1500万个文档)中提取数据。在

对于平滑的批量拉取,在配置中使用query_pagesize = <XXXX>,其中XXXX应该相对较小(比如~100K)。这将确保收集的负载有限,从而平稳响应。在

为了在DataFrame中进行查询并获取计数,我们使用了以下代码。(斯卡拉)

df.createOrReplaceTempView("c") 

var records = spark.sql("SELECT count(*) FROM c") 

records.show()`

相关问题 更多 >

    热门问题