PySpark作业永远挂起(在实时分析期间)

2024-06-16 11:58:01 发布

您现在位置:Python中文网/ 问答频道 /正文

一周前,我贴了一个类似的问题,但一直没有得到回答。 我花了很多时间调试我遇到的问题,请允许我简要介绍一下问题:

大约8-16小时(平均11小时)后,有一个工作卡住了,并引发停滞。截图

enter image description here

以及

enter image description here

如果我手动终止那份工作,火花就会崩溃。在

log4j日志根本不显示任何警告/错误。所以我添加了我自己的记录器来找出哪一步失败了。我的代码如下:

DS = KafkaUtils.createDirectStream(ssc, ...)
dstream = DS.map(...)
dstream.foreachRDD(lambda time, rdd:
    rdd.foreachPartition(lamda parti: doWork(time, parti) )
)

def doWork(time, parti):
    for part in parti:
            mention = part['mention'] # extract string from json
            words = nltk.wordpunct_tokenize(mention)
            kw = part['keyword']
            #...
            log.info("I")
            if len(set(dictKeywords[kw]).intersection([w.lower() for w in words])) <= 0:
                    retobj['keeper']=0 # don't keep it
            elif detect(editedMention) != 'en':
                    retobj['keeper']=0 # don't keep it

            cleantxt = ppr.clean(mention)
            log.info("J")
            # ...

以下是作业卡住时的日志文件:

^{pr2}$

它应该打印“J”,但它没有,因此有三个函数之一导致它挂起/崩溃/失速:集合。相交/ 语言检测/tweet预处理器ppr。在

但这没什么意义,为什么这么久之后就失败了?我的代码中到处都有“try+except”块,如果有异常,它就会被记录下来。在

  • 我在本地模式(单节点)。在
  • 我使用“spark submit”启动python脚本。在
  • 我试过让Spark同时使用python3.5.2和python3.6。在
  • 我尝试过缓存RDD和不缓存。在
  • 这不是OOM问题,GC日志没有显示任何异常。在

有什么想法吗?谢谢!!!在


Tags: 代码inlogfortimedswords小时
1条回答
网友
1楼 · 发布于 2024-06-16 11:58:01

@user8371915非常感谢您的帮助。 我相信我已经发现了问题并正在进行最后的测试。在

我使用的Tweet预处理器模块(source:https://github.com/s/preprocessor)有一个非常讨厌的bug。看看这个代码:

import preprocessor as ppr
mention = "Try this Bitcoin Price app https://itunes.apple.com/in/app/bitcoin-price-calculator/id1315298877?mt=8[app](https://itunes.apple.com/in/app/bitcoin-price-calculator/id1315298877?mt=8)"
print(mention)
cleantxt = ppr.clean(mention)
print(cleantxt)

上面的代码将永远停留在“clean”方法中。但这只会发生在一个非常具体的tweet(如上面的一个)。所以在遇到这样一条微博之前,需要几个小时。在

我删除了与预处理器模块相关的所有内容,并重新设计了我的代码。我相信我不会再有这个问题了。在

相关问题 更多 >