我们正在尝试使用spark从他们的S3读取一些合作伙伴的数据。我们为该项目设置了以下代码:
S3_BUCKET = 'BUCKET_NAME'
ROLE_SESSION_NAME = 'SESSION_NAME'
BASE_ROLE_ARN = 'BASE_ROLE_ARN/'
ROLE_ARN = BASE_ROLE_ARN + ROLE_NAME
DURATION_SECONDS = 3600
client = boto3.client('sts')
role = client.assume_role(
RoleArn=ROLE_ARN,
RoleSessionName=ROLE_SESSION_NAME,
DurationSeconds=DURATION_SECONDS,
ExternalId=EXTERNAL_ID
)
s3_session = boto3.session.Session(
aws_access_key_id=role['Credentials']['AccessKeyId'],
aws_secret_access_key=role['Credentials']['SecretAccessKey'],
aws_session_token=role['Credentials']['SessionToken']
)
s3_credentials = s3_session.get_credentials().get_frozen_credentials()
s3_key = s3_credentials.access_key
s3_secret = s3_credentials.secret_key
s3_session_token = s3_credentials.token
然后,我们使用以下代码读取数据:
input_path = 's3a://some_input_path/'
input_data = spark_sql_context.read.csv(input_path, header = True)
此外,我们还确保spark config的所有设置都正确:
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.access.key", s3_key
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.secret.key", s3_secret
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"
)
spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.session.token", s3_session_token
)
但在尝试读取数据时,我们会看到以下异常:
: java.nio.file.AccessDeniedException: s3a://the_input_path: getFileStatus on s3a://the_input_path: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: F08F9B987FF8DED9; S3 Extended Request ID: TRjfFjALAk7phRDxKdUlucY4yocQY2mNO4r7N6Qf9fSDzSa+TpZfimwbAzXdU+s11BBLBblfgik=), S3 Extended Request ID: TRjfFjALAk7phRDxKdUlucY4yocQY2mNO4r7N6Qf9fSDzSa+TpZfimwbAzXdU+s11BBLBblfgik=:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1683)
at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:2976)
这变得更加奇怪,因为以下代码段工作正常:
import os
import boto3
import sys
ROLE_NAME = 'ROLE_NAME'
EXTERNAL_ID = 'EXTERNAL_ID'
S3_BUCKET = 'BUCKET_NAME'
# ------------------------------------------------ DO NOT ALTER BELOW ------------------------------------------------ #
ROLE_SESSION_NAME = 'SESSION_NAME'
BASE_ROLE_ARN = 'BASE_ROLE_ARN'
ROLE_ARN = BASE_ROLE_ARN + ROLE_NAME
DURATION_SECONDS = 3600
client = boto3.client('sts')
role = client.assume_role(
RoleArn=ROLE_ARN,
RoleSessionName=ROLE_SESSION_NAME,
DurationSeconds=DURATION_SECONDS,
ExternalId=EXTERNAL_ID
)
session = boto3.session.Session(
aws_access_key_id=role['Credentials']['AccessKeyId'],
aws_secret_access_key=role['Credentials']['SecretAccessKey'],
aws_session_token=role['Credentials']['SessionToken']
)
S3 = session.resource('s3')
my_bucket = S3.Bucket(S3_BUCKET)
for object_summary in my_bucket.objects.filter(Prefix='SOME_PREFIX'):
print (object_summary.key)
知道为什么我们在尝试使用spark从S3路径读取文件时会看到这种异常吗?我们错过什么了吗
目前没有回答
相关问题 更多 >
编程相关推荐