我有一个项目任务,在EMR任务中使用我已经在s3上生成的一些输出数据。所以之前我运行了一个EMR作业,它以多个名为part xxxx的文件的形式在我的一个s3存储桶中生成一些输出。现在我需要从新的EMR作业中访问这些文件,读取这些文件的内容,并使用这些数据生成另一个输出。
这是执行此任务的本地代码:
def reducer_init(self):
self.idfs = {}
for fname in os.listdir(DIRECTORY): # look through file names in the directory
file = open(os.path.join(DIRECTORY, fname)) # open a file
for line in file: # read each line in json file
term_idf = JSONValueProtocol().read(line)[1] # parse the line as a JSON object
self.idfs[term_idf['term']] = term_idf['idf']
def reducer(self, term_poster, howmany):
tfidf = sum(howmany) * self.idfs[term_poster['term']]
yield None, {'term_poster': term_poster, 'tfidf': tfidf}
这在本地运行得很好,但问题是我现在需要的数据在s3上,我需要在reducer_in it函数中以某种方式访问它。
这是我目前所掌握的,但在EC2上执行时失败了:
def reducer_init(self):
self.idfs = {}
b = conn.get_bucket(bucketname)
idfparts = b.list(destination)
for key in idfparts:
file = open(os.path.join(idfparts, key))
for line in file:
term_idf = JSONValueProtocol().read(line)[1] # parse the line as a JSON object
self.idfs[term_idf['term']] = term_idf['idf']
def reducer(self, term_poster, howmany):
tfidf = sum(howmany) * self.idfs[term_poster['term']]
yield None, {'term_poster': term_poster, 'tfidf': tfidf}
AWS访问信息定义如下:
awskey = '*********'
awssecret = '***********'
conn = S3Connection(awskey, awssecret)
bucketname = 'mybucket'
destination = '/path/to/previous/output'
有两种方法:
步骤1:
在S3上,文件名作为密钥存储,如果有一个名为
"Demo"
的文件存储在名为"DemoFolder"
的文件夹中,则该特定文件的密钥将是"DemoFolder\Demo"
。使用以下代码将文件下载到临时文件夹中。
然后可以在该临时路径中处理该文件。
步骤2:
也可以使用
data = name.get_contents_as_string()
将数据作为字符串获取。如果文件太大(>;1gb),您可能会遇到内存错误,为了避免此类错误,您必须编写一个惰性函数,以分块读取数据。例如,可以使用
range
来使用data = name.get_contents_as_string(headers={'Range': 'bytes=%s-%s' % (0,100000000)})
获取文件的一部分。我不确定我是否正确地回答了您的问题,一旦有时间,我可以为您的需求定制代码。同时,请随时发布您的任何疑问。
相关问题 更多 >
编程相关推荐