来自PCollection的googledataflow/Apache Beam Python SideInput杀死performan

2024-06-16 11:04:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我们正在使用pythonsdk在googledataflow中运行日志文件解析作业。数据分布在数百个每日日志中,我们通过文件模式从云存储中读取这些日志。所有文件的数据量约为5-8gb(gz文件),总共有5000-8000万行。在

loglines = p | ReadFromText('gs://logfile-location/logs*-20180101')

此外,我们有一个简单(小)映射csv,它将日志文件条目映射到人类可读的文本。有大约400行,5 kb大小。在

例如,带有[param=testing2]的日志文件条目应该映射到最终输出中的“Customer requested 14day free product trial”。在

我们用一个简单的光束。地图使用sideinput,如下所示:

^{pr2}$

其中map_logentries是映射函数,mappingTable是映射表。在

但是,只有通过open()/read()读取原生python中的映射表时,这才有效。如果我们通过ReadFromText()使用梁管道执行相同的操作,并将生成的PCollection作为侧面输入传递到贴图,如下所示:

mappingTable = p | ReadFromText('gs://side-inputs/category-mapping.csv')    
customerActions = loglines | beam.Map(map_logentries,beam.pvalue.AsIter(mappingTable))

性能完全分解为每秒2-3项。在

现在,我的问题是:

  1. 为什么表现会如此糟糕,传球有什么问题 PCollection作为侧输入?在
  2. 如果不建议使用 PCollections作为侧输入,如何构建这样的 需要可以/不应硬编码到的映射的管道 映射函数?在

对我们来说,映射确实经常更改,我需要找到一种方法让“普通”用户提供它。其想法是在云存储中提供映射csv,并通过ReadFromText()将其合并到管道中。在本地阅读需要向工人提供地图,因此只有技术团队才能做到这一点。在

我知道边输入有缓存问题,但这肯定不适用于5kb的输入。在

上面所有的代码都是用来解释问题的伪代码。如有任何想法和想法,将不胜感激!在


Tags: 文件csv代码gsmap管道地图条目
2条回答

对于更有效的侧输入(中小尺寸),您可以利用 beam.pvalue.AsList(mappingTable) 由于AsList导致Beam具体化数据,所以您肯定会得到该pcollection的内存列表。在

Intended for use in side-argument specification -the same places where AsSingleton and AsIter are used, but forces materialization of this PCollection as a list.

Source: https://beam.apache.org/documentation/sdks/pydoc/2.2.0/apache_beam.pvalue.html?highlight=aslist#apache_beam.pvalue.AsList

  1. 代码看起来不错。然而,既然mappingTable是一个映射,beam.pvalue.AsDict不更适合您的用例吗?

  2. 您的mappingTable足够小,所以side input是一个很好的用例。 假定mappingTable也是静态的,您可以从DoFn函数的start_bundle中的GCS加载它。有关详细信息,请参阅this帖子的答案。如果mappingTable将来变得非常大,您还可以考虑将您的map_logentries和{}转换为PCollection的键值对,并使用^{}将它们连接起来。

相关问题 更多 >