为什么这个简单的Spark程序不使用多个内核?

2024-05-01 21:44:01 发布

您现在位置:Python中文网/ 问答频道 /正文

所以,我在一个16核多核系统上运行这个简单的程序。我管理它 通过发布以下内容。

spark-submit --master local[*] pi.py

程序代码如下。

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

当我使用top查看CPU时 消耗,只有一个核心正在被利用。为什么会这样?塞康迪,斯帕克 文档说明默认并行性包含在属性中 spark.default.parallelism。我怎么能从我的 python程序?


Tags: samplepyimport程序masterlocal系统count
3条回答

若要更改CPU核心消耗,请在spark-installation-directory/conf中的spark-env.sh文件中设置工作线程要使用的核心数 这是通过spark-env.sh文件中的SPARK_EXECUTOR_CORES属性完成的。 默认情况下,该值设置为1。

可能是因为对sc.parallelize的调用将所有数据放在一个单独的分区中。可以将分区数指定为要并行化的第二个参数:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

注意,这仍然会在驱动程序中使用一个CPU生成1200万个点,然后只将它们分散到16个分区来执行reduce步骤。

一种更好的方法是在分区之后完成大部分工作:例如,下面的方法只在驱动程序上生成一个很小的数组,然后让每个远程任务生成实际的随机数和随后的PI近似值:

part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

最后,(因为我们越懒越好),spark mllib实际上已经有了一个很好并行化的随机数据生成,看看这里:http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation。因此,可能以下内容与您尝试的内容很接近(未测试=>;可能不起作用,但希望应该很接近)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )

因为上面这些都不适合我(也许是因为我不太了解他们),这是我的两分钱。

我是从spark-submit program.py开始工作的,在我的文件中有sc = SparkContext("local", "Test")。我试着用sc.defaultParallelism验证spark的核数。原来是1。当我将上下文初始化更改为sc = SparkContext("local[*]", "Test")时,它变为16(系统的核心数),我的程序使用所有的核心。

我对spark还不太熟悉,但我的理解是,默认情况下,local表示使用一个内核,当它在程序中设置时,它会覆盖其他设置(在我的情况下,它会覆盖配置文件和环境变量中的设置)。

相关问题 更多 >