我试图通过python中的apachebeam读取JSON文件,并对其应用一些数据质量规则。 目前我正在使用beam.io.ReadFromText读取每个json行并使用一些函数修改数据。 读取JSON数据并修改它们的更好方法是什么?在
(p
| 'Getdata' >> beam.io.ReadFromText(input)
| 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
| 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
| 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
| 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
| 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
| 'write' >> beam.io.WriteToText(output_prefix) )
注意:我对这一点相当陌生,如果我现在的方法看起来太低劣愚蠢,我很抱歉。在
我觉得你的管道没问题。它将并行运行,没有任何问题。仅供参考,如果您只使用} 。在
FlatMap
来过滤元素,那么您也可以使用^{您正从错误的方向接近apachebeam(数据流)。在
您正在尝试读取一行,然后对该行一次应用一个转换。在
相反,你需要看看Beam是一个并行处理器。您将读入所有行
ReadFromText()
,然后并行地对每一行应用转换。在查看函数
beam.ParDo()
。这将允许您创建一个类来处理JSON文件的每一行。然后,您的代码将包含一些主要步骤,如ReadFromText()
、ParDo(MyJsonProcessor())
、WriteToText()
。在请记住,您的JSON必须是换行分隔的JSON。http://ndjson.org/
相关问题 更多 >
编程相关推荐