Python数据流模板,使运行时参数全局访问

2024-04-25 13:18:53 发布

您现在位置:Python中文网/ 问答频道 /正文

因此,管道的目标是能够使用运行时变量来打开csv文件并命名BigQuery表。在

我需要的是能够全局访问这些变量,或者在ParDo中访问,比如将其解析为函数。在

我尝试创建一个伪字符串,然后运行一个FlatMap来访问运行时参数并使它们全局化,尽管它没有返回任何结果。在

例如

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            '--table_name',
            type=str,
            help='Table Id')
def run()
    def rewrite_values(element):
        """ Rewrite default env values"""
        # global project_id
        # global campaign_id
        # global organization_id
        # global language
        # global file_path
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path)))
            logging.info("----------------------------")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("----------------------------")
        except Exception as e:
            logging.info("Error format----------------------------")
            raise KeyError(e)

        return file_path

    pipeline_options = PipelineOptions()
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    init_data = (p
                 | beam.Create(["Start"])
                 | beam.FlatMap(rewrite_values))

pipeline processing, running pipeline etc.

我可以访问project变量没有问题,尽管其他所有内容都返回为空。在

如果将custom\u options变量设为全局变量,或者将特定的custom对象传递到函数中,例如:| 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path)),那么它只返回RuntimeValueProvider(option: path, type: str, default_value: None)之类的内容。在

如果我使用| 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path.get())),变量是and空字符串。在

所以本质上,我只需要全局访问这些变量,有可能吗?在

最后要澄清的是,我不想使用ReadFromText方法,我可以在那里使用runtime变量,尽管将运行时选项合并到从csv文件创建的dict中会很昂贵,因为我正在处理巨大的csv文件。在


Tags: csvpathinfoprojectidformatpipelinelogging
1条回答
网友
1楼 · 发布于 2024-04-25 13:18:53

对我来说,它通过将cloud_options和{}声明为global来工作:

import argparse, logging

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            ' path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            ' table_name',
            type=str,
            help='Table Id')

def rewrite_values(element):
        """ Rewrite default env values"""
        # global project_id
        # global campaign_id
        # global organization_id
        # global language
        # global file_path
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path.get())))
            logging.info("              ")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("              ")
        except Exception as e:
            logging.info("Error format              ")
            raise KeyError(e)

        return file_path


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  global cloud_options
  global custom_options

  pipeline_options = PipelineOptions(pipeline_args)
  cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  custom_options = pipeline_options.view_as(CustomPipelineOptions)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  # Beginning of the pipeline
  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | beam.Create(["Start"])
               | beam.FlatMap(rewrite_values))

  result = p.run()
  # result.wait_until_finish

if __name__ == '__main__':
  run()

在设置了PROJECTBUCKET变量之后,我用以下方法对模板进行了分段:

^{pr2}$

并通过提供pathtable_name选项来执行它:

gcloud dataflow jobs run global_options \
    gcs-location gs://$BUCKET/templates/global_options \
    parameters path=test_path,table_name=test_table

运行时参数似乎在FlatMap中记录得很好:

enter image description here

相关问题 更多 >