数据流管道中的TextIOWrapper正在运行

2024-06-16 11:45:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我将csv文件从GCS加载到BigQuery中,并通过cloudcomposer触发该任务(然后再做一些其他的工作)。由于某些字段中存在各种字符,bq load命令无法正确解析文件,因此我将转向Dataflow以帮助解析和加载。有8个文件每个~1GB大小。它有96列数据和大约3米的记录直接从GCS加载到BQ。大多数字段都是字符串,有一些数字和时间戳类型。在

我的管道在运行,但速度非常慢。我可以成功地将这些文件读入BigQuery,但是管道在18分钟的运行时间内自动扩展到+15个worker,此时它只处理了~300k行。用户界面显示它几乎不需要推送300个元素/秒

我尝试过各种其他的在线解决方案,但是我需要的数据是未经编辑的(不能去掉奇怪的字符)和其他一些解决方案尝试使用re来分割逗号,但是字符串字段中到处都是逗号,所以这对我不起作用。(也有管道,标签和任何潜在的字符主机,所以在其他东西上划界也不是一个真正有用的选择)。该解决方案的优点是能够使用apache_beam.Map来并行化对记录执行的操作,但是它的执行不正确,给我带来了一些糟糕的结果,某些记录上的数据丢失或损坏。在

csv库是唯一能够始终正确解析文件而不会丢失数据的东西。所以我将打开的GCS文件传递给csv.DictReader,以便直接写入BQ。无论我使用的是apache_beam.io.FileBasedSource类中的self.open_file()方法,还是apache_beam.io.gcp.gcsio.GcsIO类中的open方法,我都会得到一个_io.BufferedReader,它向我抛出字节而不是字符串。因此,我使用io.TextIOWrapper来获取字符串而不是字节,这似乎“有效”,但运行速度非常慢,如上所述。在

我还尝试了一开始gzip压缩csv文件并将其读入。不用io.TextIOWrapper我可以用gzip.open(_io.BufferedReader, 'rt'),这就像一个符咒。在这种情况下,管道一直在运行,大约20分钟就完成了(根据dataflow声称它能做的,感觉还是很长的一段时间,但是如果这是我能得到的最好的结果,那么我就可以接受它了)。TextIOWrapper似乎正在显著地减慢它的速度(只是我的猜测),而其他使用{的解决方案似乎也不起作用。在

{14{runner在本地运行的cd8}不到一分钟。所以现在我有点困惑。在

(我也尝试过直接用python编写NLD_JSON然后使用bq load来运行它,但是由于各种原因,这个过程花费了一个小时的时间

下面是我的管道,其中的注释部分显示了对gzip文件执行所需的更改:

from __future__ import absolute_import

import argparse
from argparse import RawTextHelpFormatter
import logging

import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.filebasedsource import FileBasedSource

#############################################
# gzipped CSV Reading Class that converts to dictionary
#############################################


class MyCsvFileSource(FileBasedSource):
    def read_records(self, file_pattern, range_tracker):
        import os  # Need to import these inside of class otherwise the Pipeline will not recognize the library
        import csv
        from io import TextIOWrapper  # Comment this line out when reading gzipped csv
        #import gzip  # Uncomment this line when reading gzipped csv
        from apache_beam.io.gcp.gcsio import GcsIO
        my_gcs_io = GcsIO(storage_client=os.getenv('GOOGLE_APPLICATION_CREDENTIALS')) 
        # reader = csv.DictReader(gzip.open(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv'), 'rt'))  # Uncomment this line when reading gzipped csv
        reader = csv.DictReader(TextIOWrapper(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv')))  # Comment this line out when reading gzipped csv
        for record in reader:
            yield record

#############################################
# Define the Pipeline
#############################################


def run(argv=None):
    """This function defines the argument parser and pipeline arguments used to run the dataflow pipeline"""
    #############################################
    # Argument Parser
    #############################################

    parser = argparse.ArgumentParser(
        description="""
        This is an apache beam pipeline that will read a gzipped csv file and write to bigquery.
        The files can be read from GCS or local and written to bigquery in the same project.
        Required Pipeline Arguments:
        - runner
            To run locally specify the flag `--runner=DirectRunner`
            To run in GCP Dataflow specify the flag `--runner=DataflowRunner`
        - project [required only if accessing GCP, not required for local -> local]
            GCP Project ID where the Dataflow job will execute
            e.g. `--project=my-gcp-project`
        - stagingLocation [can specify local storage as well if running `DirectRunner`]
            Specify a GCS storage location where the Dataflow job can stage the code for workers to execute.
        - temp_location [can specify local storage as well if running `DirectRunner`]
            Specify a GCS storage location where the Dataflow job can stage the data for temporary storage.
        - subnetwork [required for reading from GCP GCS buckets]
            Need to specify a VPC subnetwork for the project using the following format
            `--subnetwork=regions/<REGION_NAME>/subnetworks/<SUB_NETWORK_NAME>`
        """,
        formatter_class=RawTextHelpFormatter)
    parser.add_argument("--input", help='The directory or filename that will be read into the pipeline containting 1 or more gzipped csv files')
    parser.add_argument("--output", help='The `dataset.table` where the records from `--input` will be written to')
    known_args, pipeline_args = parser.parse_known_args()

    #############################################
    # Dataflow Pipeline
    #############################################

    with beam.Pipeline(argv=pipeline_args) as p:
        (p
         | 'Read Files' >> beam.io.Read(MyCsvFileSource(known_args.input))
         | 'Write to BigQuery' >> WriteToBigQuery(table=known_args.output,
                                                  create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

我希望这条管道在从GCS到BigQuery的所有8个常规csv文件上运行不到20分钟,并且在这个过程中不会丢弃任何记录。在

希望你们能提供任何帮助。在


Tags: 文件csvthetofromioimport管道
1条回答
网友
1楼 · 发布于 2024-06-16 11:45:58

**不是对您问题的回答,而是另一种方法**

我知道您正在尝试清理数据,同时加载到BQ。您可能想要探索clouddataprep(在GCP控制台的bigdataprep部分下),这对于清理数据和数据类型非常直观(它是为转换数据而构建的)。然后,您可以将清理后的数据保存回GCS,并从bqui本身启动一个加载作业来填充bigquery表。在

相关问题 更多 >