无法将JSON正文传递给基于FastAPI和Mangum的AWS API端点

2024-04-24 22:57:19 发布

您现在位置:Python中文网/ 问答频道 /正文

总结问题

给定以下处理程序:

from fastapi import FastAPI
from mangum import Mangum


from src.api.v1 import router as api_router

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Oh my god, they killed Kenny!"}

app.include_router(api_router, prefix="/api/v1")

handler = Mangum(app)

以及以下事件:

{
  "body": {
    "data": "Oh my god. They killed Kenny!"
  },
  "resource": "/{proxy+}",
  "path": "/api/v1/price_action/$RUDPA",
  "httpMethod": "POST",
  "queryStringParameters": {
    "foo": "bar"
  },
  "multiValueQueryStringParameters": {
    "foo": [
      "bar"
    ]
  },
  "pathParameters": {
    "proxy": "/path/to/resource"
  },
  "stageVariables": {
    "baz": "qux"
  },
  "headers": {
    "Content-Type": "application/json",
    "Accept": "text/html,application/xhtml+xml,application/json;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Encoding": "gzip, deflate, sdch",
    "Accept-Language": "en-US,en;q=0.8",
    "Cache-Control": "max-age=0",
    "CloudFront-Forwarded-Proto": "https",
    "CloudFront-Is-Desktop-Viewer": "true",
    "CloudFront-Is-Mobile-Viewer": "false",
    "CloudFront-Is-SmartTV-Viewer": "false",
    "CloudFront-Is-Tablet-Viewer": "false",
    "CloudFront-Viewer-Country": "US",
    "Host": "1234567890.execute-api.eu-west-2.amazonaws.com",
    "Upgrade-Insecure-Requests": "1",
    "User-Agent": "Custom User Agent String",
    "Via": "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)",
    "X-Amz-Cf-Id": "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA==",
    "X-Forwarded-For": "127.0.0.1, 127.0.0.2",
    "X-Forwarded-Port": "443",
    "X-Forwarded-Proto": "https"
  },
  "multiValueHeaders": {
    "Accept": [
      "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
    ],
    "Accept-Encoding": [
      "gzip, deflate, sdch"
    ],
    "Accept-Language": [
      "en-US,en;q=0.8"
    ],
    "Cache-Control": [
      "max-age=0"
    ],
    "CloudFront-Forwarded-Proto": [
      "https"
    ],
    "CloudFront-Is-Desktop-Viewer": [
      "true"
    ],
    "CloudFront-Is-Mobile-Viewer": [
      "false"
    ],
    "CloudFront-Is-SmartTV-Viewer": [
      "false"
    ],
    "CloudFront-Is-Tablet-Viewer": [
      "false"
    ],
    "CloudFront-Viewer-Country": [
      "US"
    ],
    "Host": [
      "0123456789.execute-api.eu-west-2.amazonaws.com"
    ],
    "Upgrade-Insecure-Requests": [
      "1"
    ],
    "User-Agent": [
      "Custom User Agent String"
    ],
    "Via": [
      "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)"
    ],
    "X-Amz-Cf-Id": [
      "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA=="
    ],
    "X-Forwarded-For": [
      "127.0.0.1, 127.0.0.2"
    ],
    "X-Forwarded-Port": [
      "443"
    ],
    "X-Forwarded-Proto": [
      "https"
    ]
  },
  "requestContext": {
    "accountId": "123456789012",
    "resourceId": "123456",
    "stage": "prod",
    "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
    "requestTime": "09/Apr/2015:12:34:56 +0000",
    "requestTimeEpoch": 1428582896000,
    "identity": {
      "cognitoIdentityPoolId": null,
      "accountId": null,
      "cognitoIdentityId": null,
      "caller": null,
      "accessKey": null,
      "sourceIp": "127.0.0.1",
      "cognitoAuthenticationType": null,
      "cognitoAuthenticationProvider": null,
      "userArn": null,
      "userAgent": "Custom User Agent String",
      "user": null
    },
    "path": "/prod/path/to/resource",
    "resourcePath": "/{proxy+}",
    "httpMethod": "POST",
    "apiId": "1234567890",
    "protocol": "HTTP/1.1"
  }
}

我收到:

{
  "errorMessage": "'dict' object has no attribute 'encode'",
  "errorType": "AttributeError",
  "stackTrace": [
    "  File \"/var/task/mangum/adapter.py\", line 173, in __call__\n    body = body.encode()\n"
  ]
}

错误指出:

            is_binary = event.get("isBase64Encoded", False)
            body = event.get("body") or b""
            if is_binary:
                body = base64.b64decode(body)
            elif not isinstance(body, bytes):
                body = body.encode()

            asgi_cycle = HTTPCycle(
                scope, body=body, text_mime_types=self.text_mime_types
            )
            response = asgi_cycle(self.app)

            return response

这可以在第150行找到

我尝试了以下方法:

payload = {
    "data": "Oh my god, they killed Kenny!"
}

payload = json.dumps(payload)
encoded = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode()
res = requests.post(URL, data=encoded)

错误不再出现了。但是我想把字典直接传递到端点。这可能吗


Tags: pathapifalseappapplicationisbodyviewer