在使用Python SDK的Spark上运行Apache Beam wordcount管道时,并行度低

2024-04-29 01:11:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我对Spark群集配置和Pyspark管道的运行非常有经验,但我只是从Beam开始。因此,我试图在Spark PortableRunner上对Pyspark和Beam python SDK进行苹果对苹果的比较(运行在同一个小Spark集群上,每个集群有4个工作线程,每个工作线程有4个内核和8GB RAM),我已经确定了一个相当大的数据集的字数计算工作,将结果存储在拼花地板表中

因此,我下载了50GB的Wikipedia文本文件,分成大约100个未压缩文件,并将它们存储在/mnt/nfs_drive/wiki_files/目录中(/mnt/nfs_drive是安装在所有Worker上的NFS驱动器)

首先,我运行以下Pyspark wordcount脚本:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')

脚本运行良好,并在大约8分钟内将拼花文件输出到所需位置。主阶段(读取和拆分令牌)划分为合理数量的任务,以便有效地使用集群: enter image description here

我现在正试图用Beam和便携式跑步器实现同样的效果。首先,我使用以下命令启动了Spark作业服务器(在Spark master节点上):

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077

然后,在主节点和工作节点上,我运行SDK线束,如下所示:

 docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool

现在Spark cluster已设置为运行梁管道,我可以提交以下脚本:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])


wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)


_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()

代码提交成功后,我可以在Spark UI上看到作业,工作人员正在执行它。然而,即使运行超过1小时,它也不会产生任何输出

因此,我想确保我的设置没有问题,所以我在一个较小的数据集(只有一个Wiki文件)上运行了完全相同的脚本。这将在大约3.5分钟内成功完成(相同数据集上的Spark wordcount需要16秒!)

我想知道Beam怎么会那么慢,所以我开始研究Beam管道通过作业服务器提交给Spark的DAG。我注意到Spark工作的大部分时间都花在以下阶段: enter image description here

这被分成两个任务,如下所示: enter image description here

打印调试行显示此任务是执行“繁重工作”(即从wiki文件读取行和拆分标记)的地方-但是,由于这只发生在两个任务中,因此工作最多将分配给两个工作人员。同样有趣的是,在大型50GB数据集上运行会导致完全相同的DAG,而完全相同数量的任务

我很不确定如何进一步进行。Beam管道似乎降低了并行性,但我不确定这是否是由于作业服务器对管道的次优转换,或者是否应该以其他方式指定PTransforms以增加Spark上的并行性

任何建议,谢谢


Tags: 数据lambdaimport管道apachewikifilesdrive
2条回答

管道的文件IO部分可以通过使用apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*')来简化

Fusion是可能阻止并行性的另一个原因。解决方案是在读取所有文件后插入一个apache_beam.transforms.util.Reshuffle

这花了一段时间,但我发现了问题所在和解决办法

根本问题在于Beam的便携式runner,特别是将Beam作业转换为Spark作业的情况

翻译代码(由作业服务器执行)根据对sparkContext().defaultParallelism()的调用将阶段拆分为任务。作业服务器没有显式地配置默认并行性(并且不允许用户通过管道选项设置默认并行性),因此,它会返回到理论上的,根据执行器的数量配置并行性(请参见此处的说明https://spark.apache.org/docs/latest/configuration.html#execution-behavior)。调用defaultParallelism()时,这似乎是翻译代码的目标

现在,在实践中,众所周知,当依赖于回退机制时,过早调用sparkContext().defaultParallelism()可能会导致数量低于预期,因为执行者可能尚未向上下文注册。特别是,过早地调用defaultParallelism()将得到2个结果,并且阶段将仅分为2个任务

因此,我的“肮脏黑客”解决方案包括修改作业服务器的源代码,只需在实例化SparkContext之后和执行任何其他操作之前添加3秒的延迟:

$ git diff                                                                                                                                                                                                                                                                                                                         v2.25.0
diff  git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
 - a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
       conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
-      return new JavaSparkContext(conf);
+      JavaSparkContext jsc = new JavaSparkContext(conf);
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+      }
+      return jsc;
     }
   }
 }

在重新编译作业服务器并使用此更改启动它之后,对defaultParallelism()的所有调用都是在注册执行器之后完成的,并且阶段被很好地划分为16个任务(与执行器的数量相同)。正如预期的那样,由于有更多的工作人员在做这项工作,所以现在完成这项工作的速度要快得多(但仍然比纯Spark wordcount慢3倍)

虽然这是可行的,但它当然不是一个很好的解决方案。一个更好的解决方案将是以下之一:

  • 更改翻译引擎,使其能够根据可用执行者的数量以更稳健的方式推断任务的数量
  • 允许用户通过管道选项配置作业服务器用于翻译作业的默认并行性(这是Flink portable runner所做的)

在找到更好的解决方案之前,它显然会阻止在生产集群中使用Beam Spark作业服务器。我将把这个问题发布到Beam的票证队列中,以便能够实现更好的解决方案(希望很快)

相关问题 更多 >