如何使用Lambda和Python读取和覆盖AWS s3中的文件?

2024-03-29 04:53:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试以下方法。但是,当我对lambda调用的文件进行过度编译时,由于这个原因,它将进入一个循环。你能帮帮我吗。下面还粘贴了我用于lambda的代码片段

任务

  1. 当文件上载到此文件夹时,读取名为“文件夹a”的文件夹中的文件
  2. 然后替换字符大于10的particualr列
  3. 然后将此文件上载回同一文件夹,但不幸的是,由于lambda invoke,它处于循环中
  4. 尝试移动到另一个名为TrimmedFile的文件夹,然后它工作正常,没有任何循环

有人能告诉我如何在调用的同一文件夹中读取、编辑和保存文件吗?

    import json
    import urllib.parse
    import boto3
    import json
    import os
    import csv
    print('Loading function')
    s3 = boto3.client('s3')
    
    def lambda_handler(event, context):
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
        try:
            #print("CONTENT TYPE: " + key['ContentType'])
            #for record in event['Records']:
            print("file name " + key)
            #bucket = record['s3']['bucket']['name']
            #file_key = urllib.parse.unquote_plus(record['s3']['object']['key'], encoding='utf-8')
        
        file_key = key
        csvfile = s3.get_object(Bucket=bucket, Key=file_key)
        csvcontent = csvfile["Body"].read().decode("utf-8")
        file = csvcontent.split("\n")
        csv_reader = csv.reader(file)
        line_count = 0
        colindex = ''
        content = []
        contentstring = ''
        s33 = boto3.resource('s3')
        copy_source = {
              'Bucket': bucket,
              'Key': file_key
            }
        new_bucket = s33.Bucket(bucket)
        print(file_key)
        print(bucket)
        src_folder = "FolderA/"
        new_filekey = file_key.replace(src_folder,"")
        print(new_filekey)
        new_bucket.copy(copy_source, 'BKP/' + new_filekey )
        for row in csv_reader:
            if row:
                row = list(map(str.strip, row))
                if line_count == 0:
                    if 'ColToTruncate' in row:
                        colindex = row.index('ColToTruncate')
                        line_count += 1
                    else:
                        print('No ColToTruncate column found in '+ file_key)
                        return 'No ColToTruncate column found in '+ file_key
                else:
                    if len(row[colindex ]) >= 10:
                        row[colindex ] = row[colindex ][0:2]
                    line_count += 1  
                content.append(row)
                contentstring += ', '.join(row) 
                contentstring = contentstring + '\n'
        #print(contentstring)
        #filename = file_key + '.csv'
        uploadByteStream = bytes(contentstring.encode('utf-8'))
        #new_key = 'TrimmedFiles/' + new_filekey
        s3.put_object(Bucket=bucket, Key=file_key , Body=uploadByteStream)
        return True
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

Tags: 文件csvkeyinimport文件夹eventnew
2条回答

我相信您已经在S3上创建了一个事件触发器,并将其与Lambda关联,当您替换文件时,会触发Lambda,它将成为一个循环

有两种方法可以处理它:

1.配置PUT或POST事件类型(适合您的情况)以触发lambda。现在将更新后的文件保存到另一个位置,然后将其复制到原始位置。执行此操作s3将生成一个“s3:ObjectCreated:Copy”事件,该事件将不会再次调用Lambda

 # Copying file from secondary location to original location
 copy_sr = {
        "Bucket":bucket,
        "Key"   :file_key_copy
        
    }
    
    s3_resource.meta.client.copy(copy_sr, 
    final_bucket,file_key_copy
    )
    
    #Deleting the file from the secondary location
    s3_client.delete_object(Bucket=bucket,
    Key=file_key_copy
    ) 

2.使用SQS队列,并将其配置为在指定的时间段内不处理两次收到的任何消息(取决于文件更新的频率)

这是为了演示如何读取文件,并在编辑后替换文件。它可以充当骨架代码

import boto3
import base64
import json
import io


client = boto3.client('s3')
res = boto3.resource('s3')

def lambda_handler(event, context):

    file_key = event['file_key']
    file_obj = s3_res.Object("bucket_name", file_key)

    content_obj = file_obj.get()['Body'].read().decode('utf-8') # fetching the data in

    res.Object("bucket_name", file_key).delete() # Here you are deleting the old file

    ######Performing your operation and saving in new_data variable#########

    new_file = io.BytesIO(new_data.encode())

    client.upload_fileobj(new_file, "bucket_name", file_key) # uploading the file at the exact same location.

相关问题 更多 >