避免在Beam Python SDK中重新计算所有云存储文件的大小

2024-05-16 16:20:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个从谷歌云存储(GCS)目录中读取约500万个文件的管道。我已经将其配置为在谷歌云数据流上运行

问题是,当我启动管道时,需要花费数小时“计算”所有文件的大小:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]

如您所见,计算大约550万个文件的大小需要一个半小时(5549秒),然后从头开始!又花了2个小时跑完第二遍,然后又开始了第三遍!在撰写本文时,Dataflow控制台中仍然没有该作业,这让我相信这一切都发生在我的本地机器上,没有利用任何分布式计算

当我使用较小的输入数据集(2个文件)测试管道时,它会重复大小估计4次:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.

按照这种速度,在数据流作业开始之前,仅对所有550万个文件执行4次GCS大小估计就需要大约8小时

我的管道配置了--runner=DataflowRunner选项,因此它应该在数据流中运行:

python bigquery_import.py --runner=DataflowRunner #other options...

管道从地面军事系统读取如下内容:

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',
    required=True,
    help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
    files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')

有关完整代码,请参阅GitHub上的bigquery_import.py

我不明白为什么这个繁琐的过程发生在数据流环境之外,为什么它需要多次执行。我从地面军事系统读取的文件是正确的还是有更有效的方法


Tags: 文件oftheioinfoinputsizeapache
1条回答
网友
1楼 · 发布于 2024-05-16 16:20:23

谢谢你的报道。Beam有两个用于读取文本的变换ReadFromTextReadAllFromTextReadFromText将遇到此问题,但ReadAllFromText不应遇到此问题

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

ReadAllFromText的缺点是它不会执行动态工作重新平衡,但在读取大量文件时,这不应该是一个问题

创建了https://issues.apache.org/jira/browse/BEAM-9620用于跟踪ReadFromText(通常是基于文件的源)的问题

相关问题 更多 >