在Azure函数中未找到HTTP触发器
我在VS Code里用Python写了一个Azure函数,目的是把Excel表格进行转换,然后把结果存储到Azure的Blob存储里。在本地测试的时候一切都很顺利。可是当我把这个Azure函数部署到Azure门户上时,它却显示“没有找到HTTP触发器”。我用的是下面的代码:
import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import pandas as pd
from datetime import datetime
from azure.storage.blob import BlobServiceClient, generate_container_sas, ContainerSasPermissions, generate_blob_sas, BlobSasPermissions, ContainerClient
from azure.storage.blob import generate_blob_sas
from datetime import timedelta
import os
logging.info('fn started')
def excel_transform():
logging.info('transform fn triggered')
ACCOUNTNAME = "your-account-name"
KEY = "your-account-key"
TARGET_CONTAINER = "ur-target-container" # uncomment and give target container when used in azure
SOURCE_CONTAINER = "ur-source-container"
# Create a BlobServiceClient using your account URL and credentials
source_blob_service_client = BlobServiceClient (account_url=f" ", credential=KEY))
# Get a reference to the Blob Container
source_container_client = source_blob_service_client.get_container_client(SOURCE_CONTAINER)
# List all blobs (files) in the source container
source_blobs = source_container_client.list_blobs()
# Define the desired permissions and expiration for the SAS tokens
permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
expiry=datetime.utcnow() + timedelta(hours=6)
# Create and print SAS tokens and URLs for each blob(file)
chem_daily = pd.DataFrame()
def generate_sas_token_url(blob,container): # To generate SAS token and URL
logging.info('sas token created')
# Generate the SAS token for the current blob
sas_token = generate_blob_sas(
container_name=container,
blob_name= blob,
account_name=ACCOUNTNAME,
account_key=KEY, # Use account_key for generating a user delegation SAS token
permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
expiry=datetime.utcnow() + timedelta(hours=6)
)
# Create the URL with the SAS token
blob_url_with_sas =f"https://{ACCOUNTNAME}.blob.core.windows.net/{container}/{blob}?{sas_token}"
blob_url_with_sas = blob_url_with_sas.replace(" ", "%20")
return(blob_url_with_sas)
# To read and transform files from source container
def list_source_blob(): return source_container_client.list_blobs()
logging.info('blobs listed')
for blob in list_source_blob():
blob_name = blob.name
source_url = generate_sas_token_url(blob_name,SOURCE_CONTAINER)
logging.info(f'{blob_name} url created')
list_source_blob()
# To obtain LocationId from filename
excel_file_name = str(blob_name)
file_part = excel_file_name.split('-')
LocationId_1 = file_part[1].split('.')
LocationId = LocationId_1[0]
]
# To append corresponding dataframes
if sheet_name == 'process - Daily':
process_daily = pd.concat([process_daily,df])
# List of all dataframes
df_list = [process_daily]
process_daily.attrs['name']='ProcessConsumption'
target_container_client = source_blob_service_client.get_container_client(TARGET_CONTAINER)
for df in df_list:
file_name = f"excel_transformed/{df.attrs['name']}.csv" # Filename created
output_csv = df.to_csv(index = False) # Converted df to csv file
target_blob_url = generate_sas_token_url(file_name,TARGET_CONTAINER)
target_blob_client = BlobClient.from_blob_url(target_blob_url)
target_blob_client.upload_blob(output_csv, overwrite = True)
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
excel_transform()
name = req.params.get('name')
if not name:
try:
req_body = req.get_json()
except ValueError:
pass
else:
name = req_body.get('name')
if name:
return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
status_code=200
)
1. 降级了Azure核心工具
2. 在本地和Azure函数的配置中添加了必要的设置
1 个回答
0
我尝试部署问题中给出的相同代码,但发现有一些构建错误,所以我也无法将这个功能部署到Azure上。
我修改了代码如下,现在可以成功将功能部署到Azure,并且同步触发器。
import logging
import os
from datetime import datetime, timedelta
import azure.functions as func
import pandas as pd
from azure.storage.blob import (BlobClient, BlobSasPermissions,
BlobServiceClient, ContainerClient,
ContainerSasPermissions, generate_blob_sas,
generate_container_sas)
logging.info('fn started')
def excel_transform():
logging.info('transform fn triggered')
ACCOUNTNAME = "your-account-name"
KEY = "your-account-key"
TARGET_CONTAINER = "ur-target-container" # uncomment and give target container when used in azure
SOURCE_CONTAINER = "ur-source-container"
# Create a BlobServiceClient using your account URL and credentials
source_blob_service_client = BlobServiceClient (account_url="<storage_account_url> ", credential=KEY)
# Get a reference to the Blob Container
source_container_client = source_blob_service_client.get_container_client(SOURCE_CONTAINER)
# List all blobs (files) in the source container
source_blobs = source_container_client.list_blobs()
# Define the desired permissions and expiration for the SAS tokens
permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
expiry=datetime.utcnow() + timedelta(hours=6)
# Create and print SAS tokens and URLs for each blob(file)
chem_daily = pd.DataFrame()
def generate_sas_token_url(blob,container): # To generate SAS token and URL
logging.info('sas token created')
# Generate the SAS token for the current blob
sas_token = generate_blob_sas(
container_name=container,
blob_name= blob,
account_name=ACCOUNTNAME,
account_key=KEY, # Use account_key for generating a user delegation SAS token
permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
expiry=datetime.utcnow() + timedelta(hours=6)
)
# Create the URL with the SAS token
blob_url_with_sas =f"https://{ACCOUNTNAME}.blob.core.windows.net/{container}/{blob}?{sas_token}"
blob_url_with_sas = blob_url_with_sas.replace(" ", "%20")
return(blob_url_with_sas)
# To read and transform files from source container
def list_source_blob(): return source_container_client.list_blobs()
logging.info('blobs listed')
for blob in list_source_blob():
blob_name = blob.name
source_url = generate_sas_token_url(blob_name,SOURCE_CONTAINER)
logging.info(f'{blob_name} url created')
list_source_blob()
# To obtain LocationId from filename
excel_file_name = str(blob_name)
file_part = excel_file_name.split('-')
LocationId_1 = file_part[1].split('.')
LocationId = LocationId_1[0]
# To append corresponding dataframes
if sheet_name == 'process - Daily':
process_daily = pd.concat([process_daily,df])
# List of all dataframes
df_list = [process_daily]
process_daily.attrs['name']='ProcessConsumption'
target_container_client = source_blob_service_client.get_container_client(TARGET_CONTAINER)
for df in df_list:
file_name = f"excel_transformed/{df.attrs['name']}.csv" # Filename created
output_csv = df.to_csv(index = False) # Converted df to csv file
target_blob_url = generate_sas_token_url(file_name,TARGET_CONTAINER)
target_blob_client = BlobClient.from_blob_url(target_blob_url)
target_blob_client.upload_blob(output_csv, overwrite = True)
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
excel_transform()
name = req.params.get('name')
if not name:
try:
req_body = req.get_json()
except ValueError:
pass
else:
name = req_body.get('name')
if name:
return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
status_code=200
)
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"AzureWebJobsStorage": "UseDevelopmentStorage=true"
}
}
host.json:
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions":
{
"http":
{
"routePrefix": "api",
"maxOutstandingRequests": 200,
"maxConcurrentRequests": 100,
"dynamicThrottlesEnabled": true,
"hsts": { "isEnabled": true, "maxAge": "10" },
"customHeaders": { "X-Content-Type-Options": "nosniff" }
}
}
}
- 成功将功能部署到Azure:
门户: