AWS Glue将GET(RESTAPI)请求的Json响应转换为DataFrame/DyanamicFramce,并将其存储在s3 bucket中

2024-05-19 02:13:51 发布

您现在位置:Python中文网/ 问答频道 /正文

headersAPI = {
     'Content-Type': 'application/json'
      , 'accept': 'application/json'
       ,'Authorization': 'Bearer XXXXXXXXXXXXXXXXXXXXXXXXXX',
}
skill_response=requests.get("XXXXXX",headers=headersAPI),headers=headersAPI)

log.info(skill_response.text)
skill_json=skill_response.json()
print(skill_json)  ##print the json data and verified
    
log.info('skills data')
log.info(skill_json["status"]) 
        
DataSink0 = glueContext.write_dynamic_frame.from_options(frame =
   skill_json, connection_type = "s3", format = "csv", connection_options=
   {"path": "s3://xxxxx/", "partitionKeys": []}, transformation_ctx= "DataSink0")

job.commit()

类型错误:帧\或\ dfc必须是DynamicFrame或DynamicFrameCollection。得到<;类“dict”>

在写入S3时,我遇到了以下错误:'dict' object has no attribute '_jdf'


Tags: infologjsondatas3applicationresponseconnection
1条回答
网友
1楼 · 发布于 2024-05-19 02:13:51

通过首先从响应字符串(讨论here)创建数据帧,然后将此数据帧转换为动态帧,可以将JSON响应转换为动态帧

这个例子应该适用于:

import requests
from awsglue.job import Job
from pyspark.context import SparkContext

from awsglue import DynamicFrame
from awsglue.context import GlueContext

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

r = requests.get(url='https://api.github.com/users?since=100')

df = spark.read.json(sc.parallelize([r.text]))

dynamic_frame = DynamicFrame.fromDF(
    df, glue_ctx=glueContext, name="df"
)

#dynamic_frame.show()

DataSink0 = glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="s3", format="csv",
    connection_options={"path": "s3://xxxxx/",
                        "partitionKeys": []},
    transformation_ctx="DataSink0")
job.commit()

相关问题 更多 >

    热门问题