我需要在谷歌云存储中提取zip文件中的文件。我使用python函数来实现这一点,但即使使用Dask集群,并且每个Dask工作进程都有20GB的内存限制,我仍然会遇到内存问题
我如何优化我的代码,使其不消耗那么多内存?也许将zip文件分块读取并流式传输到临时文件,然后将该文件发送到Google云存储
请在此提供指导
这是我的密码:
@task
def unzip_files(
bucket_name,
zip_data
):
file_date = zip_data['file_date']
gcs_folder_path = zip_data['gcs_folder_path']
gcs_blob_name = zip_data['gcs_blob_name']
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
destination_blob_pathname = f'{gcs_folder_path}/{gcs_blob_name}'
blob = bucket.blob(destination_blob_pathname)
zipbytes = io.BytesIO(blob.download_as_string())
if is_zipfile(zipbytes):
with ZipFile(zipbytes, 'r') as zipObj:
extracted_file_paths = []
for content_file_name in zipObj.namelist():
content_file = zipObj.read(content_file_name)
extracted_file_path = f'{gcs_folder_path}/hgdata_{file_date}_{content_file_name}'
blob = bucket.blob(extracted_file_path)
blob.upload_from_string(content_file)
extracted_file_paths.append(f'gs://{bucket_name}/{extracted_file_path}')
return extracted_file_paths
else:
return []
我不太了解您的代码,但一般来说,dask使用
fsspec
和gcsfs
库很好地处理了这样复杂的文件操作。例如(这不需要Dask)你可以这样做
并用Dask并行循环
相关问题 更多 >
编程相关推荐