WriteToAvro使用数据流模板从BigQuery读取数据后未将数据写入文件

2024-06-16 11:22:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经绞尽脑汁一个月了,但我无法使用WriteToAvro将数据写入GCS存储桶

from __future__ import absolute_import
from __future__ import division

from datetime import datetime, timedelta, date
import argparse
import logging
import re
import os
import sys
import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.io.avroio import WriteToAvro
from apache_beam.runners.runner import PipelineState
from apache_beam.io.gcp import bigquery_tools
import threading
import time
import types 
from fastavro import parse_schema

from google.cloud import bigquery
import google.cloud.logging

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        from datetime import datetime
        parser.add_value_provider_argument('--projectname'
                                           , type=str
                                           , help='BigQuery project name to dntl data from')
        parser.add_value_provider_argument('--jobname'
                                           , type=str
                                           , help='Dataflow JobName'
                                                'format: Jobname')
        #parser.add_value_provider_argument('--input_query'
        #                                   , help='Dataflow Input Subscription Name'
        #                                        'format: input_subscription')
        parser.add_value_provider_argument('--output_path'
                                           , type=str
                                           , help='GCS path and name of File'
                                                'format: gs://BUCKET_NAME/FOLDER_NAME/FILE_NAME')

QUERY_BODY = "pubsub_id AS CNTCT_EVENT_ID"

schema = {
          "name": "Transaction",
          "type": "record",
          "fields": [
              {"name": "CNTCT_EVENT_ID", "type": ["null", "long"]}
          ]
         } 

schema_parsed = parse_schema(schema)

def run(argv=None):
    options = PipelineOptions()
    p = beam.Pipeline(options=options)
    known_args = options.view_as(CustomParams)
    options.view_as(WorkerOptions).use_public_ips = False
    options.view_as(SetupOptions).save_main_session = True
    gcp_project = options.view_as(GoogleCloudOptions).project
    if 'work-dv' in gcp_project:
        select_query = "SELECT {0} \
        FROM `cio-datahub-work-dv-c03a6c.work_cust_intractn.usage_event_content_append`".format(QUERY_BODY)
    elif 'work-qa' in gcp_project:
        select_query = "SELECT {0} \
        FROM `cio-datahub-work-dv-c03a6c.work_cust_intractn.usage_event_content_append`".format(QUERY_BODY)
        
    rows = p | 'READ FROM Table' >> beam.io.Read(beam.io.BigQuerySource(query=select_query, use_standard_sql=True))

    writeDataToAvro = (rows 
                       |'WRITE TO AvroFile' >> WriteToAvro(known_args.output_path
                                        , schema_parsed
                                        , file_name_suffix='.avro'
                                        , use_fastavro = True
                                       )
         )
    
    result=p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

我试过以下几件事:

  1. 使用avro.Schema.Parse读取架构->;给出无法读取shcema的错误,JSON应为str、bytes或bytearray而不是dict
  2. 当解决该问题时,使用“”模式“”在“”代码“”中给出“”。在运行代码时,我得到一个错误,即当shecma在avro中时正在使用fastavro
  3. 当通过说出use_fastavro=False解决该错误时,获取试图将键“CNTCT_EVENT_ID”映射到值<;0x7f05abfb7950处的avro.schema.Field对象>;在ImmutableDict{}
  4. 在定义AvroFileSink()之后,尝试使用WriteToFiles(path=job\u options.outputLocation,sink=sink),但再次仅生成了文件,没有根据Beam streaming pipeline does not write files to bucket写入数据
  5. 尝试将读取的数据转换为JSON时,出现了错误,如fastavro中的文件“fastavro/_write.pyx”,fastavro中的文件“fastavro/_write.pyx”,fastavro中的文件“fastavro/_write.pyx”,fastavro中的数据文件“fastavro/_write.pyx”,fastavro中的数据文件“fastavro/_write.pyx”,记录属性错误:“str”对象没有属性“get”

一切都在Jupyter实验室环境下工作。但我一创建模板,它就失败了。不知道为什么。请有人帮我做这个

编辑: 根据要求,从DirectRunner运行的Jupyter实验室代码:

from __future__ import absolute_import
from __future__ import division

import argparse
import logging
import sys
import os
import re

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.avroio import WriteToAvro
from fastavro import parse_schema
from avro.schema import Parse

from datetime import datetime
import json

import threading
import time
import types   
from apache_beam.runners.runner import PipelineState

#pip install --upgrade fastavro <- Need to do this else will get Writer not found fo error
#pip install --upgrade avro-python3 <- Need to do this or else will get error due to schema

PROJECT_ID = 'cio-sea-team-lab-9e09db'
BUCKET = 'cio-sea-team-lab-9e09db-ecp-project'
# load the Service Account json file to allow GCP resources to be used
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "cio-sea-team-lab-9e09db-0b2782fa290b.json"

class Printer(beam.DoFn):
    """To Print the output on the Console Using ParDo"""
    def process(self,data_item):
        print (data_item)
        
select_query = 'SELECT \
pubsub_id AS CNTCT_EVENT_HEADER_ID \
FROM `cio-sea-team-lab-9e09db.ecp_project.outbound_contact_event_pubsub`'

#WORKS WITH AVRO.SCHEMA.PARSE
"""
SCHEMA = """{"namespace": "example.avro"
          , "name": "Transaction"
          , "type": "record"
          , "fields": [
              {"name": "CNTCT_EVENT_HEADER_ID", "type": ["null", "int"]}
          ]
         }"""

schema_parsed=Parse(SCHEMA)
"""

#WORKS WITH FASTAVRO.PARSE_SCHEMA CURRENTLY IN USE
SCHEMA = {"namespace": "example.avro"
          , "name": "Transaction"
          , "type": "record"
          , "fields": [
              {"name": "CNTCT_EVENT_HEADER_ID", "type": ["null", "int"]}
          ]
         }
schema_parsed = parse_schema(SCHEMA)         

def run(argv=None):
    pipeline_args = [
        '--project={0}'.format(PROJECT_ID),
        '--job_name=bq-to-bq-dtl',
        '--region=northamerica-northeast1',
        '--save_main_session',
        '--staging_location=gs://{0}/misc-temp/'.format(BUCKET),
        '--temp_location=gs://{0}/misc-temp/'.format(BUCKET),
        '--subnetwork=https://www.googleapis.com/compute/alpha/projects/cio-sea-team-lab-9e09db/regions/northamerica-northeast1/subnetworks/sealabsubnet',
        '--runner=DirectRunner'
    ]
    
    #parser = argparse.ArgumentParser()
    #known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(WorkerOptions).use_public_ips = False
    
    #with beam.Pipeline(options=pipeline_options) as p:
    p = beam.Pipeline(options=pipeline_options)
    # Read the table r(options = pipeline_optionsows into a PCollection.
    rows = p | 'READ FROM Staging Table' >> beam.io.Read(beam.io.BigQuerySource(query = select_query
                                                                                , use_standard_sql=True))


    # Write the output using a "Write" transform that has side effects.
    x = (rows 
         #|'Print' >>  beam.ParDo(Printer())
         | 'WriteToText' >> WriteToAvro('gs://cio-sea-team-lab-9e09db-ecp-project/tgtoutput/CNTCT_EVENT_HEADER/CNTCT_EVENT_HEADER_GCP_'+ datetime.now().strftime("%Y%m%d%H%M")
                                                , file_name_suffix='.avro'
                                                , schema=schema_parsed
                                                #, use_fastavro=False) This will be used if avro-python3 is being used else its not required to be mentioned
                                                )
         )
    
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Tags: tonamefromimportprojectpipelineschemaapache