我需要并行执行一次 Azure 函数(我们称之为 OperationalFunction)。我的意思是:
我有两个“入口点”功能:
我有一个 BlobTrigger 函数 (BlobTriggeredStartFunction) 我有一个 HttpTrigger 函数 (HttpTriggeredStartFunction) 这两个函数都调用协调器函数 (OrchestratorFunction),如下所示:
string instanceId = await starter.StartNewAsync(nameof(OrchestrationFunction),null,data);
这个编排函数有以下代码:
[FunctionName("OrchestrationFunction")]
public async Task OrchestrationFunction(
[OrchestrationTrigger] IDurableOrchestrationContext context){
var input = context.GetInput<string>();
await context.CallActivityAsync(nameof(OperationalFunction),input);
}
我希望 *OperationalFunction * 仅并行执行一次,这样,如果我收到多个触发器(n blob/http 触发事件),则 *OperationalFunction * 在某一时间仅执行一个实例,当这样的实例结束,然后执行队列(先进先出)中的新事件。
哪种方法最好?
我尝试通过添加以下内容来编辑 host.json 文件:
"blobs": {
"maxDegreeOfParallelism": 1,
"poisonBlobThreshold": 1
}
但这并不完全是我想要的,只是解决了一部分问题。
我还尝试添加一个循环 do...while 来获取协调器的状态:
do {
status = await starter.GetStatusAsync(instanceId);
await Task.Delay(2000);
}
while (
status.RuntimeStatus == OrchestrationRuntimeStatus.Running ||
status.RuntimeStatus == OrchestrationRuntimeStatus.Pending);
但我不认为这是最有效的解决方案。
我创建了一个具有两个触发器(Blob 触发器和 HTTP 触发器)的示例持久编排,并仅并行成功执行了一次
OperationalFunction
。
host.json
中配置以下设置,我们可以确保OperationalFunction
在所有触发器中一次仅运行一个实例。 "durableTask": {
"maxConcurrentActivityFunctions": 1,
"maxConcurrentOrchestrations": 1
}
下面是使用 Blob 触发器和 HTTP 触发器函数进行持久编排的完整代码。
HttpTriggeredStartFunction.cs:
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Http;
namespace FunctionApp5.Triggers
{
public class HttpTriggeredStartFunction
{
[FunctionName("HttpTriggeredStartFunction")]
public async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
log.LogInformation("HTTP Triggered start function hit.");
string requestData = await new StreamReader(req.Body).ReadToEndAsync();
await starter.StartNewAsync("OrchestrationFunction", null, requestData);
return new OkObjectResult("HTTP orchestration started successfully.");
}
}
}
BlobTriggeredStartFunction.cs:
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
namespace FunctionApp5.Triggers
{
public class BlobTriggeredStartFunction
{
[FunctionName("BlobTriggeredStartFunction")]
public async Task RunAsync(
[BlobTrigger("kamcontainer/{name}", Connection = "AzureWebJobsStorage")] Stream blobStream,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
log.LogInformation($"Blob trigger received for: {blobStream}");
await starter.StartNewAsync("OrchestrationFunction", null, "blob-data");
}
}
}
操作函数.cs:
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace FunctionApp5.Activities
{
public class OperationalFunction
{
[FunctionName("OperationalFunction")]
public async Task<string> RunAsync([ActivityTrigger] string input, ILogger log)
{
log.LogInformation($"OperationalFunction execution started with input: {input}");
await Task.Delay(5000);
log.LogInformation("OperationalFunction execution finished.");
return "Success";
}
}
}
OrchestrationFunction.cs:
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
namespace FunctionApp5.Orchestrator
{
public class OrchestrationFunction
{
[FunctionName("OrchestrationFunction")]
public async Task OrchestrationFunctionAsync(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
log.LogInformation("Starting orchestration.");
var input = context.GetInput<string>();
await context.CallActivityAsync<string>("OperationalFunction", input);
log.LogInformation("Operational Function finished execution.");
}
}
}
输出:
Durable Orchestration 功能执行成功,Blob 触发功能的日志如下所示。
我成功使用Postman向HTTP触发函数发送了
POST
请求,如下所示。
发布请求后我得到了以下功能日志。