在AWS Glue Python Sh中查询Athena表

2024-04-27 04:56:41 发布

您现在位置:Python中文网/ 问答频道 /正文

pythonshell作业是在AWS Glue中引入的。他们提到:

You can now use Python shell jobs, for example, to submit SQL queries to services such as ... Amazon Athena ...

好吧。我们有一个从Athena表here读取数据的示例:

^{1}$

但是,它使用Spark而不是pythonshell。在Spark作业类型中没有通常可用的库,我有一个错误:

ModuleNotFoundError: No module named 'awsglue.transforms'

如何重写上面的代码,使其在pythonshell作业类型中可执行?在


Tags: toawsyou类型foruse作业jobs
2条回答

问题是,pythonshell类型有自己有限的内置libraries。在

我只是通过使用Boto 3查询数据和Pandas将其读入数据帧来实现我的目标。在

以下是代码片段:

import boto3
import pandas as pd

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
athena_client = boto3.client(service_name='athena', region_name='us-east-1')
bucket_name = 'bucket-with-csv'
print('Working bucket: {}'.format(bucket_name))

def run_query(client, query):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={ 'Database': 'sample-db' },
        ResultConfiguration={ 'OutputLocation': 's3://{}/fromglue/'.format(bucket_name) },
    )
    return response

def validate_query(client, query_id):
    resp = ["FAILED", "SUCCEEDED", "CANCELLED"]
    response = client.get_query_execution(QueryExecutionId=query_id)
    # wait until query finishes
    while response["QueryExecution"]["Status"]["State"] not in resp:
        response = client.get_query_execution(QueryExecutionId=query_id)

    return response["QueryExecution"]["Status"]["State"]

def read(query):
    print('start query: {}\n'.format(query))
    qe = run_query(athena_client, query)
    qstate = validate_query(athena_client, qe["QueryExecutionId"])
    print('query state: {}\n'.format(qstate))

    file_name = "fromglue/{}.csv".format(qe["QueryExecutionId"])
    obj = s3_client.get_object(Bucket=bucket_name, Key=file_name)
    return pd.read_csv(obj['Body'])

time_entries_df = read('SELECT * FROM sample-table')

我有几个月在用胶水,我用:

from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

data_frame = spark.read.format("com.databricks.spark.csv")\
    .option("header","true")\
    .load(<CSVs THAT IS USING FOR ATHENA - STRING>)

相关问题 更多 >