如何在调试模式下调用PySpark?

2024-05-16 03:45:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经用ApacheSpark1.4建立了IntelliJ IDEA。

我想能够添加调试点到我的Spark Python脚本,以便我可以轻松地调试它们。

我正在运行这个Python来初始化spark进程

proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)

if VERBOSE:
    print proc.stdout.read()
    print proc.stderr.read()

spark-submit最终调用myFirstSparkScript.py时,调试模式未启用,它将正常执行。不幸的是,编辑Apache Spark源代码并运行自定义副本不是一个可接受的解决方案。

有人知道spark submit是否可以在调试模式下调用apachespark脚本吗?如果是,怎么做?


Tags: 脚本read进程stdoutprocsparksubprocesssubmit
1条回答
网友
1楼 · 发布于 2024-05-16 03:45:28

就我所知,考虑到Spark的架构,你想要的东西是不可能直接实现的。即使没有subprocess调用,程序中唯一可以直接在驱动程序上访问的部分就是SparkContext。与其他部分相比,您实际上被不同的通信层隔离开来,包括至少一个(在本地模式下)JVM实例。为了说明这一点,让我们使用PySpark Internals documentation中的图表。

enter image description here

左框中的内容是可在本地访问并可用于附加调试器的部分。因为它只限于JVM调用,所以实际上没有什么值得您感兴趣的,除非您实际修改了PySpark本身。

右边的部分是远程发生的,并且取决于您使用的集群管理器,从用户的角度来看,这几乎是一个黑匣子。此外,在很多情况下,右边的Python代码只会调用JVM API。

这是不好的部分。好的方面是,大多数时候不需要远程调试。除了访问像TaskContext这样的对象(这些对象很容易被模仿),代码的每个部分都应该很容易在本地运行/测试,而不需要使用Spark实例。

传递给操作/转换的函数采用标准和可预测的Python对象,并期望返回标准Python对象。同样重要的是,这些药物应该没有副作用

因此,在一天结束的时候,你必须要完成你程序的一部分——一个可以交互访问并基于输入/输出和“计算核心”进行测试的薄层,它不需要Spark进行测试/调试。

其他选择

尽管如此,你并不是完全没有选择。

本地模式

被动地将调试器附加到正在运行的解释器

普通GDB和PySpark调试器都可以附加到正在运行的进程。只有在PySpark守护进程和/或工作进程启动后,才能执行此操作。在本地模式下,可以通过执行虚拟操作来强制执行,例如:

sc.parallelize([], n).count()

其中n是本地模式下可用的多个“核心”(local[n])。在类Unix系统上逐步执行的示例过程:

  • 启动PySpark外壳:

    $SPARK_HOME/bin/pyspark 
    
  • 使用pgrep检查是否没有正在运行的守护进程:

    ➜  spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
    ➜  spark-2.1.0-bin-hadoop2.7$
    
  • 在PyCharm中,同样的事情可以通过以下方式确定:

    alt+shift+a并选择附加到本地进程

    enter image description here

    运行->;附加到本地进程

    此时,您应该只看到PySpark shell(可能还有一些不相关的进程)。

    enter image description here

  • 执行虚拟操作:

    sc.parallelize([],1).count()

  • 现在您应该同时看到daemonworker(这里只有一个):

    ➜  spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
    13990
    14046
    ➜  spark-2.1.0-bin-hadoop2.7$
    

    以及

    enter image description here

    具有较低pid的进程是守护进程,具有较高pid的进程是(可能)临时工作进程。

  • 此时,您可以将调试器附加到感兴趣的进程:

    • 在PyCharm中通过选择连接的过程。
    • 使用普通的GDB调用:

      gdb python <pid of running process>
      

这种方法最大的缺点是,您在正确的时间找到了正确的解释器。

分布式模式

使用连接到调试器服务器的活动组件)

带着妖精

PyCharm提供了Python Debug Server,可用于PySpark作业。

首先,应为远程调试器添加配置:

  • alt+shift+a并选择编辑配置运行->;编辑配置
  • 单击添加新配置(green plus),然后选择Python Remote Debug。
  • 配置host和端口取决于您自己的配置(确保端口和可从远程计算机访问)

    enter image description here

  • 启动调试服务器:

    移位+F9

    您应该看到调试器控制台:

    enter image description here

  • 通过安装或分发egg文件,确保可以在工作节点上访问pyddev

  • ^{}使用必须包含在代码中的活动组件:

    import pydevd
    pydevd.settrace(<host name>, port=<port number>)
    

    棘手的部分是找到包含它的正确位置,除非调试批处理操作(如传递给mapPartitions的函数),否则可能需要修补PySpark源本身,例如pyspark.daemon.workerRDD方法,如RDD.mapPartitions。假设我们对调试工作者行为感兴趣。可能的补丁如下:

    diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
    index 7f06d4288c..6cff353795 100644
    --- a/python/pyspark/daemon.py
    +++ b/python/pyspark/daemon.py
    @@ -44,6 +44,9 @@ def worker(sock):
         """
         Called by a worker process after the fork().
         """
    +    import pydevd
    +    pydevd.settrace('foobar', port=9999, stdoutToServer=True, stderrToServer=True)
    +
         signal.signal(SIGHUP, SIG_DFL)
         signal.signal(SIGCHLD, SIG_DFL)
         signal.signal(SIGTERM, SIG_DFL)
    

    如果决定修补Spark源,请确保使用位于$SPARK_HOME/python/lib中的修补源非打包版本。

  • 执行PySpark代码。返回调试器控制台,享受以下乐趣:

    enter image description here

其他工具

有很多工具,包括python-manhole^{},可以通过一些努力来使用PySpark。

注意:

当然,您可以在本地模式下使用“remote”(活动)方法,在某种程度上也可以在分布式模式下使用“local”方法(您可以连接到工作节点并遵循与本地模式中相同的步骤)。

相关问题 更多 >