如果队列中有要处理的数据,如何旋转新线程。

2024-04-19 04:37:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个函数,zip流数据到bytebuffer,从这个bytebuffer我创建5000行/块,现在我试图写这些块回s3桶在不同的文件,因为我使用的是AWS Lambda我不能让单线程处理所有的工作流,因为有5分钟的限制后,AWS Lambda超时,来自Java线程的实现非常简单,但在python中,我对如何执行线程池以处理进程的uploading file to s3部分感到困惑,下面是我的代码:

import io
import zipfile
import boto3
import sys
import multiprocessing
# from multiprocessing.dummy import Pool as ThreadPool
import time


s3_client = boto3.client('s3')
s3 = boto3.resource('s3', 'us-east-1')


def stream_zip_file():
    # pool = ThreadPool(threads)
    start_time_main = time.time()
    start_time_stream = time.time()
    obj = s3.Object(
        bucket_name='monkey-business-dev-data',
        key='sample-files/daily/banana/large/banana.zip'
    )
    end_time_stream = time.time()
    # process_queue = multiprocessing.Queue()
    buffer = io.BytesIO(obj.get()["Body"].read())
    output = io.BytesIO()
    print (buffer)
    z = zipfile.ZipFile(buffer)
    foo2 = z.open(z.infolist()[0])
    print(sys.getsizeof(foo2))
    line_counter = 0
    file_clounter = 0
    for line in foo2:
        line_counter += 1
        output.write(line)
        if line_counter >= 5000:
            file_clounter += 1
            line_counter = 0
            # pool.map(upload_to_s3, (output, file_clounter))
            # upload_to_s3(output, file_clounter)
            # process_queue.put(output)
            output.close()
            output = io.BytesIO()
    if line_counter > 0:
        # process_queue.put(output)
        # upload_to_s3(output, file_clounter)
        # pool.map(upload_to_s3, args =(output, file_clounter))
        output.close()
    print('Total Files: {}'.format(file_clounter))
    print('Total Lines: {}'.format(line_counter))
    output.seek(0)
    start_time_upload = time.time()

    end_time_upload = time.time()

    output.close()
    z.close()
    end_time_main = time.time()

    print('''
    main: {}
    stream: {}
    upload: {}
    '''.format((end_time_main-start_time_main),(end_time_stream-start_time_stream),(end_time_upload-start_time_upload)))


def upload_to_s3(output, file_name):
    output.seek(0)
    s3_client.put_object(
        Bucket='monkey-business-dev-data', Key='sample-files/daily/banana/large/{}.txt'.format(file_name),
        ServerSideEncryption='AES256',
        Body=output,
        ACL='bucket-owner-full-control'
    )

#     consumer_process = multiprocessing.Process(target=data_consumer, args=(process_queue))
#     consumer_process.start()
#
#
# def data_consumer(queue):
#     while queue.empty() is False:



if __name__ == '__main__':
    stream_zip_file()

现在我已经尝试了几种方法,我的具体要求是有10个线程大小的线程池,这些线程总是会有一个队列,如果chunk可以上传到队列中,线程将执行并开始上传chunk,同时一个线程将始终为新chunk连续地汇集队列,如果chunk可用,则一个新线程(如果线程1仍在s3上传中忙碌)将自动启动并将文件上传到s3,以此类推。我在这里和谷歌上查过很多答案,但对我虚弱的头脑来说,似乎什么都不起作用,也没有什么意义。你知道吗


Tags: toimportoutputstreams3timemainline