如何通过python读取apachebeam(dataflow)中的JSON文件?

2024-05-16 15:40:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图通过python中的apachebeam读取JSON文件,并对其应用一些数据质量规则。 目前我正在使用beam.io.ReadFromText读取每个json行并使用一些函数修改数据。 读取JSON数据并修改它们的更好方法是什么?在

(p
  | 'Getdata' >> beam.io.ReadFromText(input)
  | 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
  | 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
  | 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
  | 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
  | 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
  | 'write' >> beam.io.WriteToText(output_prefix)  )

注意:我对这一点相当陌生,如果我现在的方法看起来太低劣愚蠢,我很抱歉。在


Tags: 数据方法lambdanameiojsonlinephone
2条回答

我觉得你的管道没问题。它将并行运行,没有任何问题。仅供参考,如果您只使用FlatMap来过滤元素,那么您也可以使用^{}。在

您正从错误的方向接近apachebeam(数据流)。在

您正在尝试读取一行,然后对该行一次应用一个转换。在

相反,你需要看看Beam是一个并行处理器。您将读入所有行ReadFromText(),然后并行地对每一行应用转换。在

查看函数beam.ParDo()。这将允许您创建一个类来处理JSON文件的每一行。然后,您的代码将包含一些主要步骤,如ReadFromText()ParDo(MyJsonProcessor())WriteToText()。在

请记住,您的JSON必须是换行分隔的JSON。http://ndjson.org/

相关问题 更多 >