我们正在使用pythonsdk在googledataflow中运行日志文件解析作业。数据分布在数百个每日日志中,我们通过文件模式从云存储中读取这些日志。所有文件的数据量约为5-8gb(gz文件),总共有5000-8000万行。在
loglines = p | ReadFromText('gs://logfile-location/logs*-20180101')
此外,我们有一个简单(小)映射csv,它将日志文件条目映射到人类可读的文本。有大约400行,5 kb大小。在
例如,带有[param=testing2]的日志文件条目应该映射到最终输出中的“Customer requested 14day free product trial”。在
我们用一个简单的光束。地图使用sideinput,如下所示:
^{pr2}$其中map_logentries是映射函数,mappingTable是映射表。在
但是,只有通过open()/read()读取原生python中的映射表时,这才有效。如果我们通过ReadFromText()使用梁管道执行相同的操作,并将生成的PCollection作为侧面输入传递到贴图,如下所示:
mappingTable = p | ReadFromText('gs://side-inputs/category-mapping.csv')
customerActions = loglines | beam.Map(map_logentries,beam.pvalue.AsIter(mappingTable))
性能完全分解为每秒2-3项。在
现在,我的问题是:
对我们来说,映射确实经常更改,我需要找到一种方法让“普通”用户提供它。其想法是在云存储中提供映射csv,并通过ReadFromText()将其合并到管道中。在本地阅读需要向工人提供地图,因此只有技术团队才能做到这一点。在
我知道边输入有缓存问题,但这肯定不适用于5kb的输入。在
上面所有的代码都是用来解释问题的伪代码。如有任何想法和想法,将不胜感激!在
对于更有效的侧输入(中小尺寸),您可以利用
beam.pvalue.AsList(mappingTable)
由于AsList
导致Beam具体化数据,所以您肯定会得到该pcollection的内存列表。在代码看起来不错。然而,既然
mappingTable
是一个映射,beam.pvalue.AsDict
不更适合您的用例吗?您的}转换为} 将它们连接起来。
mappingTable
足够小,所以side input是一个很好的用例。 假定mappingTable
也是静态的,您可以从DoFn
函数的start_bundle
中的GCS加载它。有关详细信息,请参阅this帖子的答案。如果mappingTable
将来变得非常大,您还可以考虑将您的map_logentries
和{PCollection
的键值对,并使用^{相关问题 更多 >
编程相关推荐