我使用 Durable Functions 实现了数据分析,以便每个实例只有一个函数。
使用VScode的Durable函数扩展,执行细节如图所示。
“exploratory_data_analysis”实例到“decision_tree_regression”实例之间的等待时间很短。然而,“random_forest”的开始距离前一个函数的结束需要相当长的时间。这导致整体响应时间更长。
“random_forest”执行时间长是不可避免的,但为什么启动实例需要这么长时间呢? 传递的数据与“redge_regression”等相同,因此不应是传输数据造成的开销。
还有,这个扩展所代表的时间是实例启动到返回代码的时间吗?
random_forest
函数在最后启动,因为它是在所有其他先前活动之后运行的最后一个活动。 Azure Durable Function 将根据代码顺序运行活动。据我了解,random_forest 是在持久函数中运行的最后一个活动,因此它最终显示出来。水平列的时间表示您的函数创建或启动的时间,直到最后更新时间或完成时间。
我尝试执行与您类似的数据分析,下面是我的结果:-
参考-我的SO答案
我的function_app.py:-
确保通过更有效地调用数据来正确分配代码中的负载:-
import pickle
import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing # Dataset
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
instance_id = await client.start_new("orchestrator", None, {})
await client.wait_for_completion_or_create_check_status_response(req, instance_id)
return client.create_check_status_response(req, instance_id)
### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
data = yield context.call_activity("prepare_data", '')
simple = yield context.call_activity("simple_regression", {"data": data})
multiple = yield context.call_activity("multiple_regression", {"data": data})
exploratory_data_analysis = yield context.call_activity("exploratory_data_analysis", {"data": data})
data_processing = yield context.call_activity("multivariate_linear_regression", {"data": data})
multivariate_linear_regression = yield context.call_activity("data_processing", {"data": data})
ridge_regression = yield context.call_activity("ridge_regression", {"data": data})
k_nearest_neighbor = yield context.call_activity("k_nearest_neighbor", {"data": data})
decision_tree_regression = yield context.call_activity("decision_tree_regression", {"data": data})
random_forest = yield context.call_activity("random_forest", {"data": data})
return "finished"
### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
# prepare data
california_housing = fetch_california_housing()
exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
print(exp_data.columns.tolist) # explanatory variables
tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # target variable
data = pd.concat([exp_data, tar_data], axis=1) # merge data
print("Column Names:")
print(data.columns.tolist())
# Delete anomalous values
data = data[data['HouseAge'] != 52]
data = data[data['HousingPrices'] != 5.00001]
# Create useful variables
data['Household'] = data['Population']/data['AveOccup']
data['AllRooms'] = data['AveRooms']*data['Household']
data['AllBedrms'] = data['AveBedrms']*data['Household']
# Ensure 'MedInc' column doesn't contain null or missing values
data = data.dropna(subset=['MedInc'])
# Create a dictionary to store multiple data items
prepared_data = {
'data': data.to_dict(),
'columns': data.columns.tolist(),
'target_column': 'HousingPrices',
'MedInc': data['MedInc'].tolist() # Add 'MedInc' column to the dictionary as a list
}
return prepared_data
@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Handling missing or NaN values
data.dropna(inplace=True)
# Selecting the explanatory variable 'MedInc' and target 'HousingPrices'
X_simple = data[['MedInc']]
y = data[arg['target_column']]
# Check lengths of X_simple and y
if len(X_simple) != len(y):
return "Lengths of X_simple and y do not match"
# Initialize and fit the linear regression model
simple_model = LinearRegression()
simple_model.fit(X_simple, y)
return {
'model': simple_model,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Handling missing or NaN values
data.dropna(inplace=True)
# Selecting multiple explanatory variables and target 'HousingPrices'
X_multiple = data.drop(columns=[arg['target_column']]) # Drop the target column
y = data[arg['target_column']]
# Check lengths of X_multiple and y
if len(X_multiple) != len(y):
return "Lengths of X_multiple and y do not match"
# Initialize and fit the multiple regression model
multiple_model = LinearRegression()
multiple_model.fit(X_multiple, y)
return {
'model': multiple_model,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
@app.activity_trigger(input_name="arg")
def exploratory_data_analysis(arg: dict):
try:
# Convert dictionary back to a DataFrame
data = pd.DataFrame.from_dict(arg['data'])
# Perform exploratory data analysis tasks here
# For example: summary statistics, visualizations, etc.
# Placeholder - perform some EDA tasks here
eda_result = "Exploratory Data Analysis performed"
return {
'result': eda_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### data processing ###
@app.activity_trigger(input_name="arg")
def data_processing(arg: dict):
try:
# Placeholder - Add data processing steps here
# This could involve data cleaning, normalization, etc.
processed_data_result = "Data processing completed"
return {
'result': processed_data_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
@app.activity_trigger(input_name="arg")
def multivariate_linear_regression(arg: dict):
try:
# Placeholder - Implement Multivariate Linear Regression here
# Use the processed data from 'arg' dictionary
mlr_result = "Multivariate Linear Regression completed"
return {
'result': mlr_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### ridge regression ###
@app.activity_trigger(input_name="arg")
def ridge_regression(arg: dict):
try:
# Placeholder - Implement Ridge Regression here
ridge_result = "Ridge Regression completed"
return {
'result': ridge_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### k-nearest neighbor ###
@app.activity_trigger(input_name="arg")
def k_nearest_neighbor(arg: dict):
try:
# Placeholder - Implement K-Nearest Neighbor here
knn_result = "K-Nearest Neighbor completed"
return {
'result': knn_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### decision tree regression ###
@app.activity_trigger(input_name="arg")
def decision_tree_regression(arg: dict):
try:
# Placeholder - Implement Decision Tree Regression here
dt_result = "Decision Tree Regression completed"
return {
'result': dt_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
### random forest ###
@app.activity_trigger(input_name="arg")
def random_forest(arg: dict):
try:
# Placeholder - Implement Random Forest here
rf_result = "Random Forest completed"
return {
'result': rf_result,
'status': 'success'
}
except Exception as e:
return f"Error: {str(e)}"
在您的场景中,在之前的活动完成后,random_forest需要很长时间才能启动,这也是因为函数处理繁重。 如果您在本地或在 Function App 中运行此函数,它将使用
AzureWebJobsStorage
连接字符串中提到的默认 Azure 存储帐户运行该函数,如果您想更有效地运行持久函数,您可以添加额外的存储帐户耐用功能如下:-
我的
local.settings.json
还有额外的MyStorageAccountAppSetting
:-
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=vsiddheshrg989173;AccountKey=xxxxxStOyfWOw==;EndpointSuffix=core.windows.net",
"MyStorageAccountAppSetting" : "DefaultEndpointsProtocol=https;AccountName=silicon1;AccountKey=xxxxxaBKbO/PA+AStKWFvaQ==;EndpointSuffix=core.windows.net",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
}
}
您需要在 Function App 环境变量中应用相同的设置。
我的
host.json
包含MyStorageAccountAppSetting
:-
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.*, 4.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"connectionStringName": "MyStorageAccountAppSetting"
}
}
}
}
您还可以为您的活动函数添加并发限制设置,通过在host.json文件中指定分区计数来增加函数和分区计数机制的并行执行,例如:-
host.json:-
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.*, 4.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"connectionStringName": "MyStorageAccountAppSetting",
"partitionCount": 3
},
"maxConcurrentActivityFunctions": 10,
"maxConcurrentOrchestratorFunctions": 10
}
}
}
水平时间是 createdTime UTC 和 lastUpdatedTime UTC:-
我的 random_forest 活动通过我的 host.json 中的以下设置正确触发:-
我已经参考了这两个 Github 文档来进行主机、json 设置,并参考这些文档来获得有关提高 Azure Durable Functions 性能的更多见解:-