<p>你的问题很难回答,也很棘手</p>
<p>为了重现你的行为,我用一个大数据集进行了测试</p>
<h2>问题描述</h2>
<p>我在一个大型数据集中测试了以下两个案例:</p>
<pre><code># Case 1
df.count() # Execution time: 37secs
# Case 2
df.filter((df['ID'] == id0)).count() #Execution time: 1.39 min
</code></pre>
<h2>解释</h2>
<p>让我们看看只有<code>.count()</code>的物理计划:</p>
<pre><code>== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#38L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#41L])
+- *(1) FileScan csv [] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
</code></pre>
<p>让我们先查看物理计划,然后再查看<code>.filter()</code>:</p>
<pre><code>== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#61L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#64L])
+- *(1) Project
+- *(1) Filter (isnotnull(ID#11) && (ID#11 = Muhammed MacIntyre))
+- *(1) FileScan csv [ID#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(ID), EqualTo(ID,Muhammed MacIntyre)], ReadSchema: struct<_c1:string>
</code></pre>
<p>通常,Spark when counts of rows映射count=1的行,并减少所有映射器以创建最终行数</p>
<p>在<strong>案例2中</strong>Spark必须首先过滤,然后为每个分区创建部分计数,然后再进行另一个阶段将这些计数相加。因此,对于相同的行,在第二种情况下,Spark也会进行过滤,这会影响大型数据集中的计算时间。Spark是一个用于分布式处理的框架,没有像Pandas这样的索引,可以在不传递所有行的情况下极快地进行过滤</p>
<h2>总结</h2>
<p>在这种简单的情况下,您不能做很多事情来提高执行时间。
您可以使用不同的配置设置(例如#spark.sql.shuffle.partitions、<code># spark.default.parallelism</code>、<code># of executors</code>、<code># executor memory</code>等)尝试应用程序</p>