azure函数服务总线python读取大量消息

2024-03-29 13:41:29 发布

您现在位置:Python中文网/ 问答频道 /正文

在我的python Azure函数中(使用ServiceBustigger)是否有机会读取一堆消息

这个问题其实很简单。我做了很多尝试,但Azure功能框架在读取来自Azure服务总线的消息后,似乎每次调用一条消息,而不管预取值有多大或设置了任何参数

我想处理整个消息获取过程,因为当您必须处理数百万条消息时,一次处理一条或多条消息会产生不同

这是我的配置:

function.json

{
    "scriptFile": "main.py",
    "entryPoint": "function_handler",
    "bindings": [{
        "name": "msg",
        "type": "serviceBusTrigger",
        "direction": "in",
        "topicName": "topic-name",
        "subscriptionName": "topic-subscription-name"
    }]
}

main.py

import azure.functions as func
import json

def function_handler(msg: func.ServiceBusMessage):    
    print(str(msg))

host.json

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  },
  "extensions": {
    "serviceBus": {
      "prefetchCount": 100,
      "messageHandlerOptions": {
          "autoComplete": true, 
          "maxConcurrentCalls": 6,
          "maxAutoRenewDuration": "00:05:00"
      }
    }
  }
}

Tags: 函数namepyimportjson消息topicmain
1条回答
网友
1楼 · 发布于 2024-03-29 13:41:29

我认为您可能需要一个异步服务总线触发器:

__init__.py

import logging
import asyncio

import azure.functions as func

async def hello():
    await asyncio.sleep(2)

async def main(msg: func.ServiceBusMessage):
    logging.info('Python ServiceBus queue trigger processed message: %s',
                 msg.get_body().decode('utf-8'))
    await asyncio.gather(hello(), hello())
    logging.info(msg.get_body().decode('utf-8')+" is been done.")

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  },
  "extensions": {"serviceBus": {"messageHandlerOptions": {"maxConcurrentCalls": 5}}}
}

我可以得到:

enter image description here

相关问题 更多 >