在Spark workers上更改PYSPARK_PYTHON

2024-04-25 07:20:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我们发布了使用Spark的Python应用程序,以及Python 3.7解释器(python.exe,所有必要的lib都位于MyApp.exe附近)

要设置PYSPARK_PYTHON,我们有一个函数,该函数确定到python.exe的路径:

os.environ['PYSPARK_PYTHON'] = get_python()  

在Windows上PYSPARK_PYTHON将变成C:/MyApp/python.exe
在Ubuntu上PYSPARK_PYTHON将变成/opt/MyApp/python.exe

我们启动主/驱动程序节点并在Windows上创建SparkSession。然后我们在Ubuntu上启动worker节点,但worker失败,原因是:

Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 1614, 10.0.2.15, executor 1): java.io.IOException: Cannot run program "C:/MyApp/python.exe": error=2, No such file or directory

当然,ubuntu上没有C:/MyApp/python.exe

如果我正确理解了这个错误,驱动程序中的PYSPARK_PYTHON将发送给所有工作人员

还尝试在{}和{}中设置{}。如何将Ubuntu workers上的PYSPARK_PYTHON更改为/opt/MyApp/python.exe


Tags: 函数in应用程序节点failureubuntuwindows驱动程序
1条回答
网友
1楼 · 发布于 2024-04-25 07:20:31

浏览souce代码时,Python驱动程序代码似乎在^{}中创建用于运行Python函数的工作项时,将Python可执行路径的值从其Spark上下文中放入:

def _wrap_function(sc, func, deserializer, serializer, profiler=None):
    assert deserializer, "deserializer should not be empty"
    assert serializer, "serializer should not be empty"
    command = (func, profiler, deserializer, serializer)
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
                                                                             ^^^^^^^^^^^^^
                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)

Python运行程序^{}然后使用它接收到的第一个工作项中存储的路径来启动新的解释器实例:

private[spark] abstract class BasePythonRunner[IN, OUT](
    funcs: Seq[ChainedPythonFunctions],
    evalType: Int,
    argOffsets: Array[Array[Int]])
  extends Logging {
  ...
  protected val pythonExec: String = funcs.head.funcs.head.pythonExec
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  ...
  def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
    ...
    val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
    ...
  }
  ...
}

基于这一点,恐怕目前不可能在master和worker中对Python可执行文件进行单独的配置。另见问题SPARK-26404的第三条评论。也许您应该向ApacheSpark项目提交RFE

虽然我不是Spark大师,但可能仍然有办法做到这一点,也许可以将PYSPARK_PYTHON设置为"python",然后确保系统PATH被相应地配置,以便您的Python可执行文件排在第一位

相关问题 更多 >