使用托管标识的 Azure 事件网格输出绑定 Python v2,出现 'EventGridAttribute.TopicEndpointUri' 错误

0 投票
1 回答
85 浏览
提问于 2025-04-14 16:45

我正在使用 Python 2 的 Azure 事件网格输出绑定,参考的是微软官方文档中的代码。我想用托管身份来代替主题的端点和密钥。但是在这里

@app.event_grid_output(
    arg_name="outputEvent",
    topic_endpoint_uri="MyEventGridTopicUriSetting",
    topic_key_setting="MyEventGridTopicKeySetting")

我应该怎么做呢?我一直收到下面的错误信息

无法解析属性 'EventGridAttribute.TopicEndpointUri' 的值。

import logging
import azure.functions as func
import datetime

@app.function_name(name="eventgrid_output")
@app.route(route="eventgrid_output")
@app.event_grid_output(
    arg_name="outputEvent",
    topic_endpoint_uri="MyEventGridTopicUriSetting",
    topic_key_setting="MyEventGridTopicKeySetting")
def eventgrid_output(eventGridEvent: func.EventGridEvent, 
         outputEvent: func.Out[func.EventGridOutputEvent]) -> None:

    logging.log("eventGridEvent: ", eventGridEvent)

    outputEvent.set(
        func.EventGridOutputEvent(
            id="test-id",
            data={"tag1": "value1", "tag2": "value2"},
            subject="test-subject",
            event_type="test-event-1",
            event_time=datetime.datetime.utcnow(),
            data_version="1.0"))

1 个回答

0

要使用托管身份来实现事件网格触发的Azure函数,你需要使用连接属性。

下面是函数代码:

app = func.FunctionApp()

@app.function_name(name="eventgrid_out")
@app.event_grid_trigger(arg_name="eventGridEvent")
@app.event_grid_output(
    arg_name="outputEvent",
connection="mytesteventgrid__topicEndpointUri")
def eventgrid_output(eventGridEvent: func.EventGridEvent, 
         outputEvent: func.Out[func.EventGridOutputEvent]) -> None:

    logging.info("eventGridEvent: %s", eventGridEvent)

    outputEvent.set(
        func.EventGridOutputEvent(
            id="test-id",
            data={"tag1": "value1", "tag2": "value2"},
            subject="test-subject",
            event_type="test-event-1",
            event_time=datetime.datetime.utcnow(),
            data_version="1.0"))

local.settings.json:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "mytesteventgrid__topicEndpointUri":"https://<topic_name>.southindia-1.eventgrid.azure.net/api/events"
  }
}
  • 在使用托管身份时,给你的身份分配 EventGrid Contributor, EventGrid Data Sender 角色,以便在运行时访问事件网格主题。

发送事件:

在这里输入图片描述

在这里输入图片描述

参考资料:

https://learn.microsoft.com/en-gb/answers/questions/1615439/does-eventgrid-output-binging-v2-support-managed-i

撰写回答