
2024-06-16 11:45:58

我将csv文件从GCS加载到BigQuery中,并通过cloudcomposer触发该任务(然后再做一些其他的工作)。由于某些字段中存在各种字符,bq load命令无法正确解析文件,因此我将转向Dataflow以帮助解析和加载。有8个文件每个~1GB大小。它有96列数据和大约3米的记录直接从GCS加载到BQ。大多数字段都是字符串,有一些数字和时间戳类型。在




我还尝试了一开始gzip压缩csv文件并将其读入。不用io.TextIOWrapper我可以用gzip.open(_io.BufferedReader, 'rt'),这就像一个符咒。在这种情况下,管道一直在运行,大约20分钟就完成了(根据dataflow声称它能做的,感觉还是很长的一段时间,但是如果这是我能得到的最好的结果,那么我就可以接受它了)。TextIOWrapper似乎正在显著地减慢它的速度(只是我的猜测),而其他使用{的解决方案似乎也不起作用。在


(我也尝试过直接用python编写NLD_JSON然后使用bq load来运行它,但是由于各种原因,这个过程花费了一个小时的时间


from __future__ import absolute_import

import argparse
from argparse import RawTextHelpFormatter
import logging

import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.filebasedsource import FileBasedSource

# gzipped CSV Reading Class that converts to dictionary

class MyCsvFileSource(FileBasedSource):
    def read_records(self, file_pattern, range_tracker):
        import os  # Need to import these inside of class otherwise the Pipeline will not recognize the library
        import csv
        from io import TextIOWrapper  # Comment this line out when reading gzipped csv
        #import gzip  # Uncomment this line when reading gzipped csv
        from apache_beam.io.gcp.gcsio import GcsIO
        my_gcs_io = GcsIO(storage_client=os.getenv('GOOGLE_APPLICATION_CREDENTIALS')) 
        # reader = csv.DictReader(gzip.open(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv'), 'rt'))  # Uncomment this line when reading gzipped csv
        reader = csv.DictReader(TextIOWrapper(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv')))  # Comment this line out when reading gzipped csv
        for record in reader:
            yield record

# Define the Pipeline

def run(argv=None):
    """This function defines the argument parser and pipeline arguments used to run the dataflow pipeline"""
    # Argument Parser

    parser = argparse.ArgumentParser(
        This is an apache beam pipeline that will read a gzipped csv file and write to bigquery.
        The files can be read from GCS or local and written to bigquery in the same project.
        Required Pipeline Arguments:
        - runner
            To run locally specify the flag `--runner=DirectRunner`
            To run in GCP Dataflow specify the flag `--runner=DataflowRunner`
        - project [required only if accessing GCP, not required for local -> local]
            GCP Project ID where the Dataflow job will execute
            e.g. `--project=my-gcp-project`
        - stagingLocation [can specify local storage as well if running `DirectRunner`]
            Specify a GCS storage location where the Dataflow job can stage the code for workers to execute.
        - temp_location [can specify local storage as well if running `DirectRunner`]
            Specify a GCS storage location where the Dataflow job can stage the data for temporary storage.
        - subnetwork [required for reading from GCP GCS buckets]
            Need to specify a VPC subnetwork for the project using the following format
    parser.add_argument("--input", help='The directory or filename that will be read into the pipeline containting 1 or more gzipped csv files')
    parser.add_argument("--output", help='The `dataset.table` where the records from `--input` will be written to')
    known_args, pipeline_args = parser.parse_known_args()

    # Dataflow Pipeline

    with beam.Pipeline(argv=pipeline_args) as p:
         | 'Read Files' >> beam.io.Read(MyCsvFileSource(known_args.input))
         | 'Write to BigQuery' >> WriteToBigQuery(table=known_args.output,

if __name__ == '__main__':



