<p>这花了一段时间,但我发现了问题所在和解决办法</p>
<p>根本问题在于Beam的便携式runner,特别是将Beam作业转换为Spark作业的情况</p>
<p>翻译代码(由作业服务器执行)根据对<code>sparkContext().defaultParallelism()</code>的调用将阶段拆分为任务。作业服务器没有显式地配置默认并行性(并且不允许用户通过管道选项设置默认并行性),因此,它会返回到理论上的<em>,</em>,根据执行器的数量配置并行性(请参见此处的说明<a href="https://spark.apache.org/docs/latest/configuration.html#execution-behavior" rel="nofollow noreferrer">https://spark.apache.org/docs/latest/configuration.html#execution-behavior</a>)。调用<code>defaultParallelism()</code>时,这似乎是翻译代码的目标</p>
<p>现在,<em>在实践中,</em>众所周知,当依赖于回退机制时,过早调用<code>sparkContext().defaultParallelism()</code>可能会导致数量低于预期,因为执行者可能尚未向上下文注册。特别是,过早地调用<code>defaultParallelism()</code>将得到2个结果,并且阶段将仅分为2个任务</p>
<p>因此,我的“肮脏黑客”解决方案包括修改作业服务器的源代码,只需在实例化<code>SparkContext</code>之后和执行任何其他操作之前添加3秒的延迟:</p>
<pre><code>$ git diff v2.25.0
diff git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
conf.setAppName(contextOptions.getAppName());
// register immutable collections serializers because the SDK uses them.
conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
- return new JavaSparkContext(conf);
+ JavaSparkContext jsc = new JavaSparkContext(conf);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
+ return jsc;
}
}
}
</code></pre>
<p>在重新编译作业服务器并使用此更改启动它之后,对<code>defaultParallelism()</code>的所有调用都是在注册执行器之后<em>完成的,并且阶段被很好地划分为16个任务(与执行器的数量相同)。正如预期的那样,由于有更多的工作人员在做这项工作,所以现在完成这项工作的速度要快得多(但仍然比纯Spark wordcount慢3倍)</p>
<p>虽然这是可行的,但它当然不是一个很好的解决方案。一个更好的解决方案将是以下之一:</p>
<ul>
<li>更改翻译引擎,使其能够根据可用执行者的数量以更稳健的方式推断任务的数量</李>
<li>允许用户通过管道选项配置作业服务器用于翻译作业的默认并行性(这是Flink portable runner所做的)</李>
</ul>
<p>在找到更好的解决方案之前,它显然会阻止在生产集群中使用Beam Spark作业服务器。我将把这个问题发布到Beam的票证队列中,以便能够实现更好的解决方案(希望很快)</p>