我对Spark群集配置和Pyspark管道的运行非常有经验,但我只是从Beam开始。因此,我试图在Spark PortableRunner上对Pyspark和Beam python SDK进行苹果对苹果的比较(运行在同一个小Spark集群上,每个集群有4个工作线程,每个工作线程有4个内核和8GB RAM),我已经确定了一个相当大的数据集的字数计算工作,将结果存储在拼花地板表中
因此,我下载了50GB的Wikipedia文本文件,分成大约100个未压缩文件,并将它们存储在/mnt/nfs_drive/wiki_files/
目录中(/mnt/nfs_drive
是安装在所有Worker上的NFS驱动器)
首先,我运行以下Pyspark wordcount脚本:
from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'
spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()
spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: Row(word=x[0], count=x[1]))
spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')
脚本运行良好,并在大约8分钟内将拼花文件输出到所需位置。主阶段(读取和拆分令牌)划分为合理数量的任务,以便有效地使用集群:
我现在正试图用Beam和便携式跑步器实现同样的效果。首先,我使用以下命令启动了Spark作业服务器(在Spark master节点上):
docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077
然后,在主节点和工作节点上,我运行SDK线束,如下所示:
docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool
现在Spark cluster已设置为运行梁管道,我可以提交以下脚本:
import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000",
"--job_name=WordCountBeam"
])
wiki_files = '/mnt/nfs_drive/wiki_files/*'
p = beam.Pipeline(options=options)
beam_counts = (
p
| fileio.MatchFiles(wiki_files)
| beam.Map(lambda x: x.path)
| beam.io.ReadAllFromText()
| 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
| beam.combiners.Count.PerElement()
| beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)
_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
pyarrow.schema(
[('word', pyarrow.binary()), ('count', pyarrow.int64())]
)
)
result = p.run().wait_until_finish()
代码提交成功后,我可以在Spark UI上看到作业,工作人员正在执行它。然而,即使运行超过1小时,它也不会产生任何输出
因此,我想确保我的设置没有问题,所以我在一个较小的数据集(只有一个Wiki文件)上运行了完全相同的脚本。这将在大约3.5分钟内成功完成(相同数据集上的Spark wordcount需要16秒!)
我想知道Beam怎么会那么慢,所以我开始研究Beam管道通过作业服务器提交给Spark的DAG。我注意到Spark工作的大部分时间都花在以下阶段:
打印调试行显示此任务是执行“繁重工作”(即从wiki文件读取行和拆分标记)的地方-但是,由于这只发生在两个任务中,因此工作最多将分配给两个工作人员。同样有趣的是,在大型50GB数据集上运行会导致完全相同的DAG,而完全相同数量的任务
我很不确定如何进一步进行。Beam管道似乎降低了并行性,但我不确定这是否是由于作业服务器对管道的次优转换,或者是否应该以其他方式指定PTransforms以增加Spark上的并行性
任何建议,谢谢
管道的文件IO部分可以通过使用
apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*')
来简化Fusion是可能阻止并行性的另一个原因。解决方案是在读取所有文件后插入一个
apache_beam.transforms.util.Reshuffle
这花了一段时间,但我发现了问题所在和解决办法
根本问题在于Beam的便携式runner,特别是将Beam作业转换为Spark作业的情况
翻译代码(由作业服务器执行)根据对
sparkContext().defaultParallelism()
的调用将阶段拆分为任务。作业服务器没有显式地配置默认并行性(并且不允许用户通过管道选项设置默认并行性),因此,它会返回到理论上的,,根据执行器的数量配置并行性(请参见此处的说明https://spark.apache.org/docs/latest/configuration.html#execution-behavior)。调用defaultParallelism()
时,这似乎是翻译代码的目标现在,在实践中,众所周知,当依赖于回退机制时,过早调用
sparkContext().defaultParallelism()
可能会导致数量低于预期,因为执行者可能尚未向上下文注册。特别是,过早地调用defaultParallelism()
将得到2个结果,并且阶段将仅分为2个任务因此,我的“肮脏黑客”解决方案包括修改作业服务器的源代码,只需在实例化
SparkContext
之后和执行任何其他操作之前添加3秒的延迟:在重新编译作业服务器并使用此更改启动它之后,对
defaultParallelism()
的所有调用都是在注册执行器之后完成的,并且阶段被很好地划分为16个任务(与执行器的数量相同)。正如预期的那样,由于有更多的工作人员在做这项工作,所以现在完成这项工作的速度要快得多(但仍然比纯Spark wordcount慢3倍)虽然这是可行的,但它当然不是一个很好的解决方案。一个更好的解决方案将是以下之一:
在找到更好的解决方案之前,它显然会阻止在生产集群中使用Beam Spark作业服务器。我将把这个问题发布到Beam的票证队列中,以便能够实现更好的解决方案(希望很快)
相关问题 更多 >
编程相关推荐