因此,管道的目标是能够使用运行时变量来打开csv文件并命名BigQuery表。在
我需要的是能够全局访问这些变量,或者在ParDo中访问,比如将其解析为函数。在
我尝试创建一个伪字符串,然后运行一个FlatMap来访问运行时参数并使它们全局化,尽管它没有返回任何结果。在
例如
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--path',
type=str,
help='csv storage path')
parser.add_value_provider_argument(
'--table_name',
type=str,
help='Table Id')
def run()
def rewrite_values(element):
""" Rewrite default env values"""
# global project_id
# global campaign_id
# global organization_id
# global language
# global file_path
try:
logging.info("File Path with str(): {}".format(str(custom_options.path)))
logging.info("----------------------------")
logging.info("element: {}".format(element))
project_id = str(cloud_options.project)
file_path = custom_options.path.get()
table_name = custom_options.table_name.get()
logging.info("project: {}".format(project_id))
logging.info("File path: {}".format(file_path))
logging.info("language: {}".format(table_name))
logging.info("----------------------------")
except Exception as e:
logging.info("Error format----------------------------")
raise KeyError(e)
return file_path
pipeline_options = PipelineOptions()
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| beam.Create(["Start"])
| beam.FlatMap(rewrite_values))
pipeline processing, running pipeline etc.
我可以访问project变量没有问题,尽管其他所有内容都返回为空。在
如果将custom\u options变量设为全局变量,或者将特定的custom对象传递到函数中,例如:| 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path))
,那么它只返回RuntimeValueProvider(option: path, type: str, default_value: None)
之类的内容。在
如果我使用| 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path.get()))
,变量是and空字符串。在
所以本质上,我只需要全局访问这些变量,有可能吗?在
最后要澄清的是,我不想使用ReadFromText
方法,我可以在那里使用runtime变量,尽管将运行时选项合并到从csv文件创建的dict中会很昂贵,因为我正在处理巨大的csv文件。在
对我来说,它通过将}声明为
cloud_options
和{global
来工作:在设置了
^{pr2}$PROJECT
和BUCKET
变量之后,我用以下方法对模板进行了分段:并通过提供
path
和table_name
选项来执行它:运行时参数似乎在FlatMap中记录得很好:
相关问题 更多 >
编程相关推荐