使用ValueProvider格式化D中的BigQuery

2021-06-13 13:51:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在使用Dataflow用python做一个循环批处理。你知道吗

基本上我从bigquery中读取数据并在上面做一些事情。。我的管道看起来像这样

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
             | "doing stuff" >> beam.Map(do_some_stuff)
             )

我想使用数据流模板来运行作业,以使其适应运行时。你知道吗

多亏了文档https://cloud.google.com/dataflow/docs/guides/templates/creating-templates,函数部分的Using ValueProvider,我在运行时使用ParDo给了“do\u some\u stuff”一个额外的参数。你知道吗


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

但是我还想更改进程所关注的用户数,因此我想使查询适应运行时。你知道吗


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_nb_users',
                                           default=100,
                                           type=int)
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

。。。这不起作用,因为我在管道执行之前调用get()。到目前为止,我还没有将我为do\u some\u stuff函数所做的工作调整到“Read”行

任何关于如何进行的建议或解决方案都将不胜感激。谢谢!你知道吗