Python的request()在尝试从API获取文件时返回响应400

2024-06-02 05:36:31 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我第一次使用API。我编写了一些代码,使用一个“get”调用从useapping的API获取数据,它可以正常工作。我现在正试图从另一个只有“post”功能的端点获取数据。下面的代码返回“400”响应。我不确定我需要改变什么才能产生输出。我不知道这是否重要,但这个端点会生成一个zip文件

import requests

payload = {"award_levels":["prime_awards"],"filters":{"award_types":["contracts","direct_payments","grants","idvs","loans","other_financial_assistance"],"agency":"United States Mint","date_type":"action_date","date_range":{"start_date":"2019-01-01","end_date":"2019-01-31"}},"columns":[],"file_format":"csv"}

response = requests.post('https://api.usaspending.gov/api/v2/bulk_download/awards/', params = payload).json()

提前谢谢

更新:

这就是我最后用的。我的日期是动态的,因此我可以使用任务调度器每月自动执行此拉取操作。如果要提取特定日期,请删除标头声明中的日期变量引用并运行

pl_str1 = """{
"filters": {
"prime_award_types": [
"A",
"B",
"C",
"D",
"IDV_A",
"IDV_B",
"IDV_B_A",
"IDV_B_B",
"IDV_B_C",
"IDV_C",
"IDV_D",
"IDV_E",
"02",
"03",
"04",
"05",
"10",
"06",
"07",
"08",
"09",
"11"],
"agency": 54,
"date_type": "action_date","""
# Inserting date_range variable into API call
pl_str2 =  '"date_range": {"start_date":'  + date_3Months_prior +', "end_date":' + date_today + '}'
pl_str3 = """},
"columns": [],
"file_format": "csv"}"""

desired_payload = pl_str1 + pl_str2 + pl_str3

# $$$$$$$$$$$$$$$$$$$$$$$$$$$ REQUESTING DESIRED INFO FROM THE API HERE $$$$$$        
url = 'https://api.usaspending.gov/api/v2/bulk_download/awards/'
headers = {'Content-Type': 'application/json'}
resp = requests.post(url, headers=headers, data=desired_payload)
if resp.status_code == 200:
    print('success')
    print(resp.content)
else:
    print('fail')
    
# API returns a zip file; grabbing that   
# This will turn the API response into a string I can use regex on
test = resp.content.decode('UTF-8')
# Extracting url for zip we want to retrieve
test2 = re.findall(r"file_url\S+\.zip", test)[0]
test3 = re.findall(r"https\S+\.zip", test2)[0]

import zipfile
import io
import time
r = requests.get(test3)
# add sleep in case it takes a while for the API to return stuff; not sure if 
# but just in case
time.sleep(5)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()

Tags: importapiurldatezippostrequestsresp
3条回答

params是URL中的查询参数

例如

www.stackoverflow.com?parameter=test

这可以表示为

payload = {
     "parameter": "test"
}

POST请求有一个body,这与从表单发布时发送的数据相同,您正在发送一个body,您在请求模块函数.post中有一些参数处理正文,您可以使用data=payloadjson=payloadSee这方面的官方文档

当您在浏览器中运行此url(https://api.usaspending.gov/api/v2/bulk_download/awards/)并用您的请求填写表单时。你会得到这样的回应 { “详细信息”:“缺少一个或多个必需的正文参数:主要奖励类型或次要奖励类型” }

添加传单、数据范围、数据类型、机构、主要奖励类型 您的请求应如下所示:

{
"filters": {
    "date_range": {
        "start_date": "2019-01-01",
        "end_date": "2019-12-31"
    },
    "date_type": "action_date",
    "agency": 50,
    "prime_award_types": [
        "02",
        "03",
        "04",
        "05",
        "A",
        "B",
        "C",
        "D"
    ],
    "award_levels": [
        "prime_awards"
    ],
    "filters": {
        "award_types": [
            "contracts",
            "direct_payments",
            "grants",
            "idvs",
            "loans",
            "other_financial_assistance"
        ],
        "agency": "United States Mint",
        "date_type": "action_date",
        "date_range": {
            "start_date": "2019-01-01",
            "end_date": "2019-01-31"
        }
    },
    "columns": [],
    "file_format": "csv"
}

}

在函数中获取表单数据,并随请求将其发送给其他函数

import requests
from flask import Flask, return , request, 

@videos.route("/ajaxprocess", methods=["GET","POST"])
def ajaxprocess():
    if request.method =="POST":

       text = request.form.get('text_text')
        font = request.form.get('font_text')
        color = request.form.get('color_text')
        start_text = request.form.get('start_text')
        end_text = request.form.get('end_text')
        video_id = request.form.get('vid')
        all_data = {'status':'OK','text':text,'font':font,'color':color,'start_text':start_text,'end_text':end_text,'vid':video_id}
        r = requests.post("http://127.0.0.1:5000/videos/test_text",json=all_data)
        print(r)
        return redirect(url_for("videos.show_videos"))
        

@videos.route("/test_text", methods = ['POST'])

def test_text():
   
    data = request.get_json("")
    print(data)

    return "success!"

相关问题 更多 >