PySpark流作业避免对象序列化

2024-06-06 20:36:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在写一份PySpark的工作,但是我遇到了一些性能问题。 基本上,它所做的就是从Kafka读取事件并记录所做的转换。 问题是,转换是根据对象的函数计算的,而这个对象是相当沉重的,因为它包含一个图形和一个自动更新的内部缓存。 所以当我写下这段代码时:

analyzer = ShortTextAnalyzer(root_dir)
logger.info("Start analyzing the documents from kafka")
ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])))

它序列化我的analyzer,由于图形的原因,这需要很多时间,当它被复制到执行器时,缓存只与特定的RDD相关。在

如果作业是用Scala编写的,那么我就可以编写一个存在于每个执行器中的对象,那么我的对象就不必每次都序列化了。在

在Python中有没有一种方法可以做到这一点?为每个执行器创建一次对象,然后它可以避免序列化过程?在

提前感谢:)

更新: 我读过post How to run a function on all Spark workers before processing data in PySpark?,但那里的答案是关于共享文件或广播变量的。 我的对象不能被广播,因为他不是只读的。它不断地更新它的内部缓存,这就是为什么我希望在每个执行器上都有一个它的对象(以避免序列化的需要)。在


Tags: kafka对象lambda函数代码图形序列化记录
1条回答
网友
1楼 · 发布于 2024-06-06 20:36:16

最后我所做的就是避免对象被序列化,把我的类转换成一个静态的类变量和类方法。这样,每个执行器只导入一次该类(及其相关变量),而不需要序列化。在

相关问题 更多 >