从Bigquery读取几行作为侧输入,得到

2024-03-29 00:45:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我在数据流管道的侧面输入(尤其是BQ)方面遇到了问题,即使在继续coursera并查看了示例之后。在

现在,我有一个管道,它读取gcs存储桶中的文件,获取它们的文件名,然后转换文件并将给定数量的行写入bigquery。我试图找出如何将文件名映射到bq中的特定“key”。在

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
#create each input PCollection name
variables = ['p{}'.format(i) for i in range(len(result))]

根据结果,我构建一个包含所有文件名(filename1,filename2…)的元组动态创建一个查询,即: Bqquery = "SELECT FILENAME, FILE_ID from 'project:dataset.table' where FILENAME IN (filename tuple)"我想我应该这样做,因为一次大约有20个文件,所以从bq获取数据一次是有意义的,而不是在for循环中获取文件的id。在

所以我做了

^{pr2}$

我还尝试在for循环中执行查询,一次只获取一个文件名(请参阅注释掉的代码),但这也不起作用。最终我要做的是将beam.Pardo(AddFilenamesFn(), current_file)更改为beam.Pardo(AddFileNamesFn(), file_id),所以我添加的不是实际的文件名,而是添加fileid

[注意代码中提到的标签(即read_labels[i])只是数据流的标签]

我想我错过了一些关于pcollections的基本知识,但不确定


Tags: 文件代码inidformatfor管道文件名
1条回答
网友
1楼 · 发布于 2024-03-29 00:45:43

考虑到前面的question中的代码,我认为最简单的解决方案是在for循环中的AddFilenamesFnParDo中运行查询。请记住,beam.io.Read(beam.io.BigQuerySource(query=bqquery))用于将行作为源读取,而不是在中间步骤中。因此,在我建议的情况下,您可以直接使用Python客户机库(google-cloud-bigquery>0.27.0):

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with file id (retrieved from BigQuery) and row"""
    def process(self, element, file_path):
        from google.cloud import bigquery

        client = bigquery.Client()
        file_name = file_path.split("/")[-1]

        query_job = client.query("""
            SELECT FILE_ID
            FROM test.file_mapping
            WHERE FILENAME = '{0}'
            LIMIT 1""".format(file_name))

        results = query_job.result()

        for row in results:
          file_id = row.FILE_ID

        yield {'filename':file_id, 'row':element}

这将是实施的最直接的解决方案,但可能会产生问题。我们不是在管道开始时运行所有20个可能的查询,而是对每一行/记录运行一个查询。例如,如果一个文件中有3000个元素,则同一个查询将被启动3000次。但是,每个不同的查询实际上应该只运行一次,随后的查询“repeats”将命中cache。还请注意,缓存的查询不会对交互式查询limit起作用。在

我使用了与我之前的answer相同的文件:

^{pr2}$

并添加了一个新表:

bq mk test.file_mapping FILENAME:STRING,FILE_ID:STRING
bq query  use_legacy_sql=false 'INSERT INTO test.file_mapping (FILENAME, FILE_ID) values ("countries1.csv", "COUNTRIES ONE"), ("countries2.csv", "COUNTRIES TWO")'

enter image description here

输出是:

INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'1,sweden'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'2,spain'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'3,italy'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'4,france'}

另一种解决方案是加载所有的表并用beam.io.BigQuerySource()将其具体化为一个侧面输入(这当然会有问题),或者,如您所说,将其分解为N个查询,并将每个查询保存到不同的侧面输入中。然后您可以为每个记录选择适当的一个,并将其作为附加输入传递给AddFilenamesFn。写这篇文章也很有意思。在

我提出的第一个解决方案的完整代码:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with file id (retrieved from BigQuery) and row"""
    def process(self, element, file_path):
        from google.cloud import bigquery

        client = bigquery.Client()

        file_name = file_path.split("/")[-1]

        query_job = client.query("""
            SELECT FILE_ID
            FROM test.file_mapping
            WHERE FILENAME = '{0}'
            LIMIT 1""".format(file_name))

        results = query_job.result()

        for row in results:
          file_id = row.FILE_ID

        yield {'filename':file_id, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()

相关问题 更多 >