我在数据流管道的侧面输入(尤其是BQ)方面遇到了问题,即使在继续coursera并查看了示例之后。在
现在,我有一个管道,它读取gcs存储桶中的文件,获取它们的文件名,然后转换文件并将给定数量的行写入bigquery。我试图找出如何将文件名映射到bq中的特定“key”。在
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
#create each input PCollection name
variables = ['p{}'.format(i) for i in range(len(result))]
根据结果,我构建一个包含所有文件名(filename1,filename2…)的元组动态创建一个查询,即:
Bqquery = "SELECT FILENAME, FILE_ID from 'project:dataset.table' where FILENAME IN (filename tuple)"
我想我应该这样做,因为一次大约有20个文件,所以从bq获取数据一次是有意义的,而不是在for循环中获取文件的id。在
所以我做了
^{pr2}$我还尝试在for循环中执行查询,一次只获取一个文件名(请参阅注释掉的代码),但这也不起作用。最终我要做的是将beam.Pardo(AddFilenamesFn(), current_file)
更改为beam.Pardo(AddFileNamesFn(), file_id)
,所以我添加的不是实际的文件名,而是添加fileid
[注意代码中提到的标签(即read_labels[i])只是数据流的标签]
我想我错过了一些关于pcollections的基本知识,但不确定
考虑到前面的question中的代码,我认为最简单的解决方案是在for循环中的
AddFilenamesFn
ParDo中运行查询。请记住,beam.io.Read(beam.io.BigQuerySource(query=bqquery))
用于将行作为源读取,而不是在中间步骤中。因此,在我建议的情况下,您可以直接使用Python客户机库(google-cloud-bigquery>0.27.0
):这将是实施的最直接的解决方案,但可能会产生问题。我们不是在管道开始时运行所有20个可能的查询,而是对每一行/记录运行一个查询。例如,如果一个文件中有3000个元素,则同一个查询将被启动3000次。但是,每个不同的查询实际上应该只运行一次,随后的查询“repeats”将命中cache。还请注意,缓存的查询不会对交互式查询limit起作用。在
我使用了与我之前的answer相同的文件:
^{pr2}$并添加了一个新表:
输出是:
另一种解决方案是加载所有的表并用
beam.io.BigQuerySource()
将其具体化为一个侧面输入(这当然会有问题),或者,如您所说,将其分解为N个查询,并将每个查询保存到不同的侧面输入中。然后您可以为每个记录选择适当的一个,并将其作为附加输入传递给AddFilenamesFn
。写这篇文章也很有意思。在我提出的第一个解决方案的完整代码:
相关问题 更多 >
编程相关推荐