为什么local[*]不使用我机器中所有可用的核心?

2024-10-01 15:46:58 发布

您现在位置:Python中文网/ 问答频道 /正文

如果这个问题已经得到回答,我很抱歉。我确实看过档案,但我没有找到我的问题的具体答案。在

我是新手。我尝试在本地并行运行这个简单的例子,在我的macossierra机器中使用spark-2.1.1。因为我有4个核心,有4个任务,每个任务需要10秒,所以我希望总共花费10秒多一点。在

我看到每项任务都需要预期的时间。但在我看来只有两条执行线索。我本来要4个的。正如您在代码中看到的,每个元组的值就是相应任务的执行时间。在

insight086:Pypark lquesada多产出/部分-00000

(u'1', 10.000892877578735)
(u'3', 10.000878095626831)

insight086:pyspark lquesada$more output/part-00001

^{pr2}$

同时,所花费的总时间也远远超过20秒:

^{3}$

提前感谢您的帮助!在

干杯, 路易斯

输入文件:

1
2
3
4

脚本:

from pyspark import SparkContext
import time

def mymap(word):
    start = time.time()
    time.sleep(10)
    et=time.time()-start
    return (word, et)

def main():
    start = time.time()
    sc = SparkContext(appName='SparkWordCount')

    input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt')
    counts = input_file.flatMap(lambda line: line.split()) \
                     .map(mymap) \
                     .reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output')

    sc.stop()
    print 'total_time',time.time()-start

if __name__ == '__main__':
   main()

Tags: importinputoutputtimemaindef时间start
1条回答
网友
1楼 · 发布于 2024-10-01 15:46:58

这就是为什么Divide and conquer algorithms有它们的阈值,在那里使用它们是有意义的。将分布添加到Spark中的mix(并行)中,您就有相当多的机器来完成这样一个很小的计算。你只是没有利用Spark的优势与这个4元素的数据集。在

假设随着越来越大的数据集,时间将集中在您的期望值附近。在

另外,当读取本地数据集时,分区的数量最多为2,因此如果没有repartitioning,则只能使用2个核心。在

repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.


local[*]意味着使用尽可能多的核心(参见SparkContextLOCAL_N_REGEX的情况):

def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
val threadCount = if (threads == "*") localCpuCount else threads.toInt

它只是提示默认使用多少个分区,但不能阻止Spark的上升或下降。它主要取决于Spark应用的优化,最终为您的分布式计算提供最佳的执行计划。Spark为您做了很多工作,抽象级别越高,优化越多(参见sparksql的优化器中的batches)。在

相关问题 更多 >

    热门问题