如何在Python Spark脚本中记录日志

69 投票
8 回答
103182 浏览
提问于 2025-04-18 17:55

我有一个用Python写的Spark程序,我是通过 spark-submit 来运行它的。我想在程序里加一些日志记录的语句。

logging.info("This is an informative message.")
logging.debug("This is a debug message.")

我想用Spark本身的日志记录工具,这样我的日志信息就能和Spark的日志格式一致,而且日志的级别也能通过同样的配置文件来控制。我该怎么做呢?

我试着在代码里加 logging 语句,并用 logging.getLogger() 开始。但无论怎样,我只能看到Spark的日志信息,而看不到我自己的日志。我查阅了Python的日志记录文档,但还是搞不明白。

我不确定这是不是提交给Spark的脚本特有的问题,还是我对日志记录的理解有误。

8 个回答

4
import logging

# Logger

logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)

用pyspark记录日志的最简单方法!

6

在我的情况下,我很高兴能把我的日志信息添加到工作进程的错误输出中,和普通的Spark日志信息一起显示。

如果这正好符合你的需求,那么诀窍就是把特定的Python日志记录器重定向到stderr

比如,下面这个代码,灵感来自于这个回答,对我来说效果很好:

def getlogger(name, level=logging.INFO):
    import logging
    import sys

    logger = logging.getLogger(name)
    logger.setLevel(level)
    if logger.handlers:
        # or else, as I found out, we keep adding handlers and duplicate messages
        pass
    else:
        ch = logging.StreamHandler(sys.stderr)
        ch.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger

用法:

def tst_log():
    logger = getlogger('my-worker')
    logger.debug('a')
    logger.info('b')
    logger.warning('c')
    logger.error('d')
    logger.critical('e')
    ...

输出(加上几行上下文以便理解):

17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
8

我们需要从执行节点记录日志,而不是从主节点记录。所以我们做了以下几步:

  1. 我们在所有节点上创建了一个 /etc/rsyslog.d/spark.conf 文件(使用了一种叫做Bootstrap的方法,配合亚马逊的弹性Map Reduce),这样核心节点就能把 local1 的系统日志消息转发到主节点。

  2. 在主节点上,我们启用了UDP和TCP的系统日志监听器,并设置所有的 local 消息都记录到 /var/log/local1.log 文件中。

  3. 我们在映射函数中创建了一个Python的 logging 模块的Syslog记录器。

  4. 现在我们可以使用 logging.info() 来记录日志了。

我们发现同一个数据分区会在多个执行节点上同时处理。显然,Spark在有额外资源时总是这样做。这种方式可以处理执行节点延迟或失败的情况。

map 函数中记录日志让我们对Spark的工作原理有了很多了解。

24

你需要获取Spark本身的日志记录器,默认情况下,getLogger()会返回你自己模块的日志记录器。你可以尝试这样做:

logger = logging.getLogger('py4j')
logger.info("My test info statement")

它也可能是'pyspark'而不是'py4j'

如果你在Spark程序中使用的某个函数(并且这个函数有日志记录)是在主函数所在的同一个模块里定义的,它可能会出现一些序列化错误。

这个问题在这里有解释,另外同一个人提供了一个示例,链接在这里

我也在Spark 1.3.1上测试过这个。

编辑:

要把日志输出从STDERR改为STDOUT,你需要先移除当前的StreamHandler,然后再添加一个新的。

找到现有的Stream Handler(完成后可以删除这一行)

print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]

可能只有一个,但如果不止一个,你需要更新位置。

logger.removeHandler(logger.handlers[0])

sys.stdout添加新的处理器

import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
76

你可以从SparkContext对象中获取日志记录器:

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")

撰写回答