如何在Python Spark脚本中记录日志
我有一个用Python写的Spark程序,我是通过 spark-submit
来运行它的。我想在程序里加一些日志记录的语句。
logging.info("This is an informative message.")
logging.debug("This is a debug message.")
我想用Spark本身的日志记录工具,这样我的日志信息就能和Spark的日志格式一致,而且日志的级别也能通过同样的配置文件来控制。我该怎么做呢?
我试着在代码里加 logging
语句,并用 logging.getLogger()
开始。但无论怎样,我只能看到Spark的日志信息,而看不到我自己的日志。我查阅了Python的日志记录文档,但还是搞不明白。
我不确定这是不是提交给Spark的脚本特有的问题,还是我对日志记录的理解有误。
8 个回答
import logging
# Logger
logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)
用pyspark记录日志的最简单方法!
在我的情况下,我很高兴能把我的日志信息添加到工作进程的错误输出中,和普通的Spark日志信息一起显示。
如果这正好符合你的需求,那么诀窍就是把特定的Python日志记录器重定向到stderr
。
比如,下面这个代码,灵感来自于这个回答,对我来说效果很好:
def getlogger(name, level=logging.INFO):
import logging
import sys
logger = logging.getLogger(name)
logger.setLevel(level)
if logger.handlers:
# or else, as I found out, we keep adding handlers and duplicate messages
pass
else:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
用法:
def tst_log():
logger = getlogger('my-worker')
logger.debug('a')
logger.info('b')
logger.warning('c')
logger.error('d')
logger.critical('e')
...
输出(加上几行上下文以便理解):
17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
我们需要从执行节点记录日志,而不是从主节点记录。所以我们做了以下几步:
我们在所有节点上创建了一个
/etc/rsyslog.d/spark.conf
文件(使用了一种叫做Bootstrap的方法,配合亚马逊的弹性Map Reduce),这样核心节点就能把local1
的系统日志消息转发到主节点。在主节点上,我们启用了UDP和TCP的系统日志监听器,并设置所有的
local
消息都记录到/var/log/local1.log
文件中。我们在映射函数中创建了一个Python的
logging
模块的Syslog记录器。现在我们可以使用
logging.info()
来记录日志了。
我们发现同一个数据分区会在多个执行节点上同时处理。显然,Spark在有额外资源时总是这样做。这种方式可以处理执行节点延迟或失败的情况。
在 map
函数中记录日志让我们对Spark的工作原理有了很多了解。
你需要获取Spark本身的日志记录器,默认情况下,getLogger()
会返回你自己模块的日志记录器。你可以尝试这样做:
logger = logging.getLogger('py4j')
logger.info("My test info statement")
它也可能是'pyspark'
而不是'py4j'
。
如果你在Spark程序中使用的某个函数(并且这个函数有日志记录)是在主函数所在的同一个模块里定义的,它可能会出现一些序列化错误。
这个问题在这里有解释,另外同一个人提供了一个示例,链接在这里。
我也在Spark 1.3.1上测试过这个。
编辑:
要把日志输出从STDERR
改为STDOUT
,你需要先移除当前的StreamHandler
,然后再添加一个新的。
找到现有的Stream Handler(完成后可以删除这一行)
print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]
可能只有一个,但如果不止一个,你需要更新位置。
logger.removeHandler(logger.handlers[0])
为sys.stdout
添加新的处理器
import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
你可以从SparkContext对象中获取日志记录器:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")