在apache beam d中如何将csv转换成字典

2024-04-26 09:58:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我想读取一个csv文件并使用apache beam数据流将其写入BigQuery。为此,我需要以字典的形式将数据呈现给BigQuery。如何使用apache beam转换数据以实现此目的?

我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表。我知道如何在BigQuery中创建数据,这是直截了当的,我不知道如何将csv转换成字典。下面的代码是不正确的,但应该给我一个想法,我正在尝试做什么。

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

Tags: 文件csvto数据io字典apachecreate
2条回答

作为巴勃罗职位的补充,我想和大家分享一下我对他的样品做的一些改动。(+1给你!)

更改: reader = csv.reader(self._file)reader = csv.DictReader(self._file)

csv.DictReader使用CSV文件的第一行作为Dict键。其他行用于用dict的值填充每行dict。它会根据列顺序自动将正确的值放入正确的键。

一个小细节是Dict中的每个值都存储为字符串。如果对某些字段使用例如INTEGER,这可能会与BigQuery架构冲突。所以你以后要注意正确的选角。

编辑:从版本2.12.0开始,Beam提供了新的fileio转换,允许您从CSV读取数据,而无需重新实现源代码。你可以这样做:

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

我最近为Apache Beam编写了一个测试。你可以看看the Github repository


旧的答案依赖于重新实现源代码。这不再是推荐的主要方式:)

其思想是有一个返回解析的CSV行的源。您可以通过子类化FileBasedSource类以包含CSV解析来实现这一点。尤其是read_records函数看起来像这样:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec

相关问题 更多 >