Spark(python)数据帧收集随机OOM

2024-05-14 07:01:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个由2个worker节点和1个驱动节点组成的小Spark(v2.1.1)集群。硬件配置如下: 主/驱动节点-4核,8gb RAM 工人-4核,16gb内存

Spark提交作业的Spark属性在这里-Spark properties screenshot。这项工作很简单——对于给定的搜索参数,从Elasticsearch集群中提取文档,并计算数据的一些度量。从Es文档创建一个spark数据帧。数据帧有15列,行数可以从50万行到10万行不等。数据帧被缓存,因为它是所有度量计算的源。计算包括总计数、总和、平均值等。以下是其中一个计算的代码片段-

''' Start & End journey pages '''
df_frag = df.select('visit_id', 'hit_seq_nbr', 'concategory')
df_frag = df_frag.withColumn('hit_seq_nbr', 
df_frag['hit_seq_nbr'].cast(ShortType()))
grpd = df_frag.groupby('visit_id').agg(min('hit_seq_nbr').alias('hit_min'), max('hit_seq_nbr').alias('hit_max'))
df_frag2 = df_frag.join(grpd, on='visit_id', how='left')
first_pages_df = df_frag2.select('concategory').filter('hit_seq_nbr == hit_min')\
                                           .groupby('concategory')\
                                           .agg(count('concategory').alias('page_count'))

# total = first_pages_df.select('page_count').groupBy().sum('page_count').withColumnRenamed('sum(page_count)', 'total_page_count')
# first_pages_df = first_pages_df.crossJoin(total)
# first_pages_df = first_pages_df.select('concategory',
#                                        round(100. * col('page_count') / col('total_page_count'), scale=2).alias('perc'))\
#                                .orderBy('perc', ascending=False)

total = first_pages_df.select('page_count').groupBy().sum('page_count').withColumnRenamed('sum(page_count)', 'total_page_count')
total = total.collect()[0]['total_page_count']
perc = udf(lambda r: 100. * r / total)
first_pages_df = first_pages_df.withColumn('perc', round(perc('page_count'), scale=2)).orderBy('perc', ascending=False)
# first_pages_df.show()
# print total
# print '======================='
top3_rows = first_pages_df.take(3)
the_metrics['journey_start'] = [(r.concategory, r.perc) for r in top3_rows]
dispose_dataframe([first_pages_df, grpd, total, df_frag])

对于10M+行的数据集,缓存的数据帧消耗大约300MB的可用存储内存(Spark Executors screenshot)。在

这个spark作业遇到的问题是在最后一次计算中出现随机OOM错误,它的.collect()与上面的代码段类似。我说随机是因为有一些运行已经成功完成,但是大多数运行都是在oom中出错的。作业运行良好,数据集为<;10M行。此外,一旦遇到OOM,错误会在随后的10M+作业中重复出现。此时,我只需重新运行停止/启动主&从脚本。我在寻找这些问题的答案-

  1. 随机发生OOM的原因是什么?

  2. collect()应用于最后一个转换,即聚合数据帧。据我所知,这应该限制调用collect()的dataframe的大小(在大多数情况下,它是一个单行列dataframe),但是OOM就发生在这里。不管转换如何,每当遇到collect()时,整个数据集都会被加载到驱动程序内存中吗?

  3. 我不完全相信这是一个硬件短缺,因为偶尔成功运行。或者这是一个驱动硬件的缺点,为大型数据集?


Tags: 数据dfcountpagepagesselectseqtotal

热门问题