在Azure函数中未找到HTTP触发器

0 投票
1 回答
88 浏览
提问于 2025-04-12 08:15

我在VS Code里用Python写了一个Azure函数,目的是把Excel表格进行转换,然后把结果存储到Azure的Blob存储里。在本地测试的时候一切都很顺利。可是当我把这个Azure函数部署到Azure门户上时,它却显示“没有找到HTTP触发器”。我用的是下面的代码:

import logging

import azure.functions as func

from azure.storage.blob import  BlobServiceClient, BlobClient, ContainerClient

import pandas as pd

from datetime import datetime

from azure.storage.blob import BlobServiceClient, generate_container_sas,  ContainerSasPermissions, generate_blob_sas, BlobSasPermissions, ContainerClient

from azure.storage.blob import generate_blob_sas

from datetime import timedelta

import os

              

logging.info('fn started')

def excel_transform():

    logging.info('transform fn triggered')

    ACCOUNTNAME = "your-account-name"

    KEY = "your-account-key"


    
    TARGET_CONTAINER = "ur-target-container" # uncomment and give target container when used in azure

    SOURCE_CONTAINER = "ur-source-container"

                                         
    # Create a BlobServiceClient using your account URL and credentials

    source_blob_service_client = BlobServiceClient (account_url=f" ", credential=KEY))

    # Get a reference to the Blob Container

    source_container_client = source_blob_service_client.get_container_client(SOURCE_CONTAINER)  

    # List all blobs (files) in the source container

    source_blobs = source_container_client.list_blobs()

    # Define the desired permissions and expiration for the SAS tokens

    permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),

    expiry=datetime.utcnow() + timedelta(hours=6)

    # Create and print SAS tokens and URLs for each blob(file)
    
    chem_daily = pd.DataFrame()
   

    def generate_sas_token_url(blob,container):  # To generate SAS token and URL

        logging.info('sas token created')

        # Generate the SAS token for the current blob

        sas_token = generate_blob_sas(
            container_name=container,
            blob_name= blob,
            account_name=ACCOUNTNAME,
            account_key=KEY,  # Use account_key for generating a user delegation SAS token
            permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
            expiry=datetime.utcnow() + timedelta(hours=6)
        )



        # Create the URL with the SAS token

        blob_url_with_sas =f"https://{ACCOUNTNAME}.blob.core.windows.net/{container}/{blob}?{sas_token}"
        blob_url_with_sas = blob_url_with_sas.replace(" ", "%20") 
        return(blob_url_with_sas)



    # To read and transform files from source container

    def list_source_blob(): return source_container_client.list_blobs()
    logging.info('blobs listed')
    for blob in list_source_blob():
        blob_name = blob.name
        source_url = generate_sas_token_url(blob_name,SOURCE_CONTAINER)

        logging.info(f'{blob_name} url created')

    list_source_blob()
    

        # To obtain LocationId from filename
        
    excel_file_name = str(blob_name)
    file_part = excel_file_name.split('-')
    LocationId_1 = file_part[1].split('.')
    LocationId = LocationId_1[0]


       

    ]
        

     # To append corresponding dataframes
        if sheet_name == 'process - Daily':
            process_daily = pd.concat([process_daily,df])        
        
            
    # List of all dataframes
    df_list = [process_daily]

    process_daily.attrs['name']='ProcessConsumption'
    


    target_container_client = source_blob_service_client.get_container_client(TARGET_CONTAINER)  


    
    for df in df_list:
        
        file_name = f"excel_transformed/{df.attrs['name']}.csv"   # Filename created
        output_csv = df.to_csv(index = False)          # Converted df to csv file 

        target_blob_url = generate_sas_token_url(file_name,TARGET_CONTAINER)
        target_blob_client = BlobClient.from_blob_url(target_blob_url)
        target_blob_client.upload_blob(output_csv, overwrite = True)


app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    excel_transform()


    
    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')

    if name:
        return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
    else:
        return func.HttpResponse(
             "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
             status_code=200
        )

1. 降级了Azure核心工具
2. 在本地和Azure函数的配置中添加了必要的设置

1 个回答

0

我尝试部署问题中给出的相同代码,但发现有一些构建错误,所以我也无法将这个功能部署到Azure上。

这里插入图片描述

我修改了代码如下,现在可以成功将功能部署到Azure,并且同步触发器。

import logging
import os
from datetime import datetime, timedelta

import azure.functions as func
import pandas as pd
from azure.storage.blob import (BlobClient, BlobSasPermissions,
                                BlobServiceClient, ContainerClient,
                                ContainerSasPermissions, generate_blob_sas,
                                generate_container_sas)

logging.info('fn started')

def excel_transform():
    logging.info('transform fn triggered')

    ACCOUNTNAME = "your-account-name"
    KEY = "your-account-key"    
    TARGET_CONTAINER = "ur-target-container" # uncomment and give target container when used in azure
    SOURCE_CONTAINER = "ur-source-container"
                                         
    # Create a BlobServiceClient using your account URL and credentials
    source_blob_service_client = BlobServiceClient (account_url="<storage_account_url> ", credential=KEY)

    # Get a reference to the Blob Container
    source_container_client = source_blob_service_client.get_container_client(SOURCE_CONTAINER)  

    # List all blobs (files) in the source container
    source_blobs = source_container_client.list_blobs()

    # Define the desired permissions and expiration for the SAS tokens 

   permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
    expiry=datetime.utcnow() + timedelta(hours=6)

    # Create and print SAS tokens and URLs for each blob(file)    
    chem_daily = pd.DataFrame()   

    def generate_sas_token_url(blob,container):  # To generate SAS token and URL

        logging.info('sas token created')

        # Generate the SAS token for the current blob

        sas_token = generate_blob_sas(
            container_name=container,
            blob_name= blob,
            account_name=ACCOUNTNAME,
            account_key=KEY,  # Use account_key for generating a user delegation SAS token
            permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
            expiry=datetime.utcnow() + timedelta(hours=6)
        )

        # Create the URL with the SAS token

        blob_url_with_sas =f"https://{ACCOUNTNAME}.blob.core.windows.net/{container}/{blob}?{sas_token}"
        blob_url_with_sas = blob_url_with_sas.replace(" ", "%20") 
        return(blob_url_with_sas)

    # To read and transform files from source container

    def list_source_blob(): return source_container_client.list_blobs()
    logging.info('blobs listed')
    for blob in list_source_blob():
        blob_name = blob.name
        source_url = generate_sas_token_url(blob_name,SOURCE_CONTAINER)

        logging.info(f'{blob_name} url created')

    list_source_blob()    

        # To obtain LocationId from filename
        
    excel_file_name = str(blob_name)
    file_part = excel_file_name.split('-')
    LocationId_1 = file_part[1].split('.')
    LocationId = LocationId_1[0]        

     # To append corresponding dataframes
    if sheet_name == 'process - Daily':
            process_daily = pd.concat([process_daily,df])        
                    
    # List of all dataframes
    df_list = [process_daily]

    process_daily.attrs['name']='ProcessConsumption'
    
    target_container_client = source_blob_service_client.get_container_client(TARGET_CONTAINER)  
    
    for df in df_list:
        
        file_name = f"excel_transformed/{df.attrs['name']}.csv"   # Filename created
        output_csv = df.to_csv(index = False)          # Converted df to csv file 

        target_blob_url = generate_sas_token_url(file_name,TARGET_CONTAINER)
        target_blob_client = BlobClient.from_blob_url(target_blob_url)
        target_blob_client.upload_blob(output_csv, overwrite = True)


app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    excel_transform()
    
    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')

    if name:
        return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
    else:
        return func.HttpResponse(
             "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
             status_code=200
        )

local.settings.json:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true"
  }
}

host.json:

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  },
  "extensions": 
  { 
    "http": 
      { 
      "routePrefix": "api", 
      "maxOutstandingRequests": 200, 
      "maxConcurrentRequests": 100, 
      "dynamicThrottlesEnabled": true, 
      "hsts": { "isEnabled": true, "maxAge": "10" }, 
      "customHeaders": { "X-Content-Type-Options": "nosniff" } 
      } 
  } 
} 
  • 成功将功能部署到Azure:

这里插入图片描述

门户:

这里插入图片描述

撰写回答