我有一个函数,zip流数据到bytebuffer,从这个bytebuffer我创建5000行/块,现在我试图写这些块回s3桶在不同的文件,因为我使用的是AWS Lambda我不能让单线程处理所有的工作流,因为有5分钟的限制后,AWS Lambda超时,来自Java线程的实现非常简单,但在python中,我对如何执行线程池以处理进程的uploading file to s3
部分感到困惑,下面是我的代码:
import io
import zipfile
import boto3
import sys
import multiprocessing
# from multiprocessing.dummy import Pool as ThreadPool
import time
s3_client = boto3.client('s3')
s3 = boto3.resource('s3', 'us-east-1')
def stream_zip_file():
# pool = ThreadPool(threads)
start_time_main = time.time()
start_time_stream = time.time()
obj = s3.Object(
bucket_name='monkey-business-dev-data',
key='sample-files/daily/banana/large/banana.zip'
)
end_time_stream = time.time()
# process_queue = multiprocessing.Queue()
buffer = io.BytesIO(obj.get()["Body"].read())
output = io.BytesIO()
print (buffer)
z = zipfile.ZipFile(buffer)
foo2 = z.open(z.infolist()[0])
print(sys.getsizeof(foo2))
line_counter = 0
file_clounter = 0
for line in foo2:
line_counter += 1
output.write(line)
if line_counter >= 5000:
file_clounter += 1
line_counter = 0
# pool.map(upload_to_s3, (output, file_clounter))
# upload_to_s3(output, file_clounter)
# process_queue.put(output)
output.close()
output = io.BytesIO()
if line_counter > 0:
# process_queue.put(output)
# upload_to_s3(output, file_clounter)
# pool.map(upload_to_s3, args =(output, file_clounter))
output.close()
print('Total Files: {}'.format(file_clounter))
print('Total Lines: {}'.format(line_counter))
output.seek(0)
start_time_upload = time.time()
end_time_upload = time.time()
output.close()
z.close()
end_time_main = time.time()
print('''
main: {}
stream: {}
upload: {}
'''.format((end_time_main-start_time_main),(end_time_stream-start_time_stream),(end_time_upload-start_time_upload)))
def upload_to_s3(output, file_name):
output.seek(0)
s3_client.put_object(
Bucket='monkey-business-dev-data', Key='sample-files/daily/banana/large/{}.txt'.format(file_name),
ServerSideEncryption='AES256',
Body=output,
ACL='bucket-owner-full-control'
)
# consumer_process = multiprocessing.Process(target=data_consumer, args=(process_queue))
# consumer_process.start()
#
#
# def data_consumer(queue):
# while queue.empty() is False:
if __name__ == '__main__':
stream_zip_file()
现在我已经尝试了几种方法,我的具体要求是有10个线程大小的线程池,这些线程总是会有一个队列,如果chunk可以上传到队列中,线程将执行并开始上传chunk,同时一个线程将始终为新chunk连续地汇集队列,如果chunk可用,则一个新线程(如果线程1仍在s3上传中忙碌)将自动启动并将文件上传到s3,以此类推。我在这里和谷歌上查过很多答案,但对我虚弱的头脑来说,似乎什么都不起作用,也没有什么意义。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐