在Python中从AWS S3读取gzip文件的内容

2024-04-25 14:01:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图从我在AWS中运行的Hadoop进程中读取一些日志。日志存储在S3文件夹中,路径如下。

bucketname=名称 键=y/z/stderr.gz 这里Y是集群id,z是文件夹名。这两个在AWS中都充当文件夹(对象)。所以完整路径类似于x/y/z/stderr.gz。

现在我想解压缩这个.gz文件并读取文件的内容。我不想将此文件下载到我的系统中,希望将内容保存在python变量中。

这是我到现在为止一直在尝试的。

bucket_name = "name"
key = "y/z/stderr.gz"
obj = s3.Object(bucket_name,key)
n = obj.get()['Body'].read()

这给了我一个不可读的格式。我也试过了

n = obj.get()['Body'].read().decode('utf-8')

这会导致错误utf8'编码解码器无法在位置1解码字节0x8b:起始字节无效。

我也试过

gzip = StringIO(obj)
gzipfile = gzip.GzipFile(fileobj=gzip)
content = gzipfile.read()

这将返回一个错误IOError:Not a gzip file

不知道如何解码这个.gz文件。

编辑-找到解决方案。需要传入n并使用BytesIO

gzip = BytesIO(n)

Tags: 文件keyname路径文件夹awsobj内容
3条回答

用python从aws s3读取Bz2扩展文件

import json
import boto3
from io import BytesIO
import bz2
try:
    s3 = boto3.resource('s3')
    key='key_name.bz2'
    obj = s3.Object('bucket_name',key)
    nn = obj.get()['Body'].read()
    gzipfile = BytesIO(nn)
    content = bz2.decompress(gzipfile.read())
    content = content.split('\n')
    print len(content)

except Exception as e:
    print(e)

@Amit,我试着做同样的事情来测试解码一个文件,并让你的代码运行一些修改。我只需要删除函数def、返回并重命名gzip变量,因为该名称正在使用中。

import json
import boto3
from io import BytesIO
import gzip

try:
     s3 = boto3.resource('s3')
     key='YOUR_FILE_NAME.gz'
     obj = s3.Object('YOUR_BUCKET_NAME',key)
     n = obj.get()['Body'].read()
     gzipfile = BytesIO(n)
     gzipfile = gzip.GzipFile(fileobj=gzipfile)
     content = gzipfile.read()
     print(content)
except Exception as e:
    print(e)
    raise e

可以使用AWS S3选择对象内容读取gzip内容

S3 Select是Amazon S3的一个功能,旨在只从对象中提取所需的数据,这可以显著提高性能并降低需要在S3中访问数据的应用程序的成本。

Amazon S3 Select对以Apache Parquet格式存储的对象、JSON数组以及CSV和JSON对象的BZIP2压缩进行工作。

参考号:https://docs.aws.amazon.com/AmazonS3/latest/dev/selecting-content-from-objects.html

from io import StringIO
import boto3
import pandas as pd

bucket = 'my-bucket'
prefix = 'my-prefix'

client = boto3.client('s3')

for object in client.list_objects_v2(Bucket=bucket, Prefix=prefix)['Contents']:
    if object['Size'] <= 0:
        continue

    print(object['Key'])
    r = client.select_object_content(
            Bucket=bucket,
            Key=object['Key'],
            ExpressionType='SQL',
            Expression="select * from s3object",
            InputSerialization = {'CompressionType': 'GZIP', 'JSON': {'Type': 'DOCUMENT'}},
            OutputSerialization = {'CSV': {'QuoteFields': 'ASNEEDED', 'RecordDelimiter': '\n', 'FieldDelimiter': ',', 'QuoteCharacter': '"', 'QuoteEscapeCharacter': '"'}},
        )

    for event in r['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            payloads = (''.join(r for r in records))
            try:
                select_df = pd.read_csv(StringIO(payloads), error_bad_lines=False)
                for row in select_df.iterrows():
                    print(row)
            except Exception as e:
                print(e)

相关问题 更多 >