如何从PySpark中的不同线程在一个Sparkcontext中运行多个作业?

2024-06-12 18:20:42 发布

您现在位置:Python中文网/ 问答频道 /正文

据Spark文档了解,关于Scheduling Within an Application

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

在Scala和Java中,我几乎找不到相同的示例代码。 有人能举例说明如何使用PySpark实现这一点吗?


Tags: andtorun文档anthatapplicationaction
2条回答

我遇到了同样的问题,所以我创建了一个小型的独立示例。我使用python的线程模块创建多个线程,并同时提交多个spark作业。

请注意,在默认情况下,spark将在先进先出(FIFO):http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application中运行作业。在下面的示例中,我将其更改为公平调度

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

输出:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000

今天,我也是这么问我的。多处理模块提供一个ThreadPool,它为您生成几个线程,从而并行运行作业。首先实例化函数,然后创建池,然后在要迭代的范围内map

在我的例子中,我计算了不同中心数(超参数调整)的WSSSE数,得到一个“好的”k均值聚类。。。就像在MLSpark documentation中描述的那样。无需进一步解释,以下是我的IPython工作表中的一些单元格:

from pyspark.mllib.clustering import KMeans
import numpy as np

c U点是12维阵列:

>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([-2,  0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([ 7, -1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0])]

在下面,对于每个i,我计算这个WSSSE值并将其作为元组返回:

def error(point, clusters):
    center = clusters.centers[clusters.predict(point)]
    return np.linalg.norm(point - center)

def calc_wssse(i):
    clusters = KMeans.train(c_points, i, maxIterations=20,
        runs=20, initializationMode="random")
    WSSSE = c_points\
        .map(lambda point: error(point, clusters))\
        .reduce(lambda x, y: x + y)
    return (i, WSSSE)

下面开始有趣的部分:

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)

运行它:

wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points

给出:

[(1, 195318509740785.66),
 (2, 77539612257334.33),
 (3, 78254073754531.1),
 ...
]

相关问题 更多 >