如何从s3获取数据并对其进行处理?Python和b

2024-04-28 17:44:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个项目任务,在EMR任务中使用我已经在s3上生成的一些输出数据。所以之前我运行了一个EMR作业,它以多个名为part xxxx的文件的形式在我的一个s3存储桶中生成一些输出。现在我需要从新的EMR作业中访问这些文件,读取这些文件的内容,并使用这些数据生成另一个输出。

这是执行此任务的本地代码:

def reducer_init(self):
        self.idfs = {}
        for fname in os.listdir(DIRECTORY): # look through file names in the directory
            file = open(os.path.join(DIRECTORY, fname)) # open a file
            for line in file: # read each line in json file
                term_idf = JSONValueProtocol().read(line)[1] # parse the line as a JSON object
                self.idfs[term_idf['term']] = term_idf['idf']

    def reducer(self, term_poster, howmany):
        tfidf = sum(howmany) * self.idfs[term_poster['term']]
        yield None, {'term_poster': term_poster, 'tfidf': tfidf}

这在本地运行得很好,但问题是我现在需要的数据在s3上,我需要在reducer_in it函数中以某种方式访问它。

这是我目前所掌握的,但在EC2上执行时失败了:

def reducer_init(self):
self.idfs = {}
b = conn.get_bucket(bucketname)
idfparts = b.list(destination)
    for key in idfparts:
    file = open(os.path.join(idfparts, key))
    for line in file:
    term_idf = JSONValueProtocol().read(line)[1] # parse the line as a JSON object
            self.idfs[term_idf['term']] = term_idf['idf']

def reducer(self, term_poster, howmany):
    tfidf = sum(howmany) * self.idfs[term_poster['term']]
    yield None, {'term_poster': term_poster, 'tfidf': tfidf}

AWS访问信息定义如下:

awskey = '*********'
awssecret = '***********'
conn = S3Connection(awskey, awssecret)
bucketname = 'mybucket'
destination = '/path/to/previous/output'

Tags: inselffors3deflinefiletfidf
1条回答
网友
1楼 · 发布于 2024-04-28 17:44:49

有两种方法:

  1. 将文件下载到本地系统中并对其进行解析。(有点简单,快速和简单)
  2. 将存储在S3上的数据放入内存并对其进行解析(如果文件太大,会更复杂一些)。

步骤1:

在S3上,文件名作为密钥存储,如果有一个名为"Demo"的文件存储在名为"DemoFolder"的文件夹中,则该特定文件的密钥将是"DemoFolder\Demo"

使用以下代码将文件下载到临时文件夹中。

AWS_KEY = 'xxxxxxxxxxxxxxxxxx'
AWS_SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'
BUCKET_NAME = 'DemoBucket'
fileName = 'Demo'

conn = connect_to_region(Location.USWest2,aws_access_key_id = AWS_KEY,
            aws_secret_access_key = AWS_SECRET_KEY,
            is_secure=False,host='s3-us-west-2.amazonaws.com'
            )
source_bucket = conn.lookup(BUCKET_NAME)

''' Download the file '''
for name in source_bucket.list():
        if name.name in fileName:
            print("DOWNLOADING",fileName)
            name.get_contents_to_filename(tempPath)

然后可以在该临时路径中处理该文件。

步骤2:

也可以使用data = name.get_contents_as_string()将数据作为字符串获取。如果文件太大(>;1gb),您可能会遇到内存错误,为了避免此类错误,您必须编写一个惰性函数,以分块读取数据。

例如,可以使用range来使用data = name.get_contents_as_string(headers={'Range': 'bytes=%s-%s' % (0,100000000)})获取文件的一部分。

我不确定我是否正确地回答了您的问题,一旦有时间,我可以为您的需求定制代码。同时,请随时发布您的任何疑问。

相关问题 更多 >