是否可以确保 Azure Function 仅并行执行一次?

问题描述 投票:0回答:1

我需要并行执行一次 Azure 函数(我们称之为 OperationalFunction)。我的意思是:

我有两个“入口点”功能:

我有一个 BlobTrigger 函数 (BlobTriggeredStartFunction) 我有一个 HttpTrigger 函数 (HttpTriggeredStartFunction) 这两个函数都调用协调器函数 (OrchestratorFunction),如下所示:

string instanceId = await starter.StartNewAsync(nameof(OrchestrationFunction),null,data);

这个编排函数有以下代码:

[FunctionName("OrchestrationFunction")]
public async Task OrchestrationFunction(
[OrchestrationTrigger] IDurableOrchestrationContext context){
   var input = context.GetInput<string>();
   await context.CallActivityAsync(nameof(OperationalFunction),input);
}

我希望 *OperationalFunction * 仅并行执行一次,这样,如果我收到多个触发器(n blob/http 触发事件),则 *OperationalFunction * 在某一时间仅执行一个实例,当这样的实例结束,然后执行队列(先进先出)中的新事件。

哪种方法最好?

我尝试通过添加以下内容来编辑 host.json 文件:

    "blobs": {
      "maxDegreeOfParallelism": 1,
      "poisonBlobThreshold": 1
    }

但这并不完全是我想要的,只是解决了一部分问题。

我还尝试添加一个循环 do...while 来获取协调器的状态:

                do {
                    status = await starter.GetStatusAsync(instanceId);
                    await Task.Delay(2000);
                }
                while (
                    status.RuntimeStatus == OrchestrationRuntimeStatus.Running || 
                    status.RuntimeStatus == OrchestrationRuntimeStatus.Pending);

但我不认为这是最有效的解决方案。

azure parallel-processing azure-functions azure-blob-trigger azure-http-trigger
1个回答
0
投票

我创建了一个具有两个触发器(Blob 触发器和 HTTP 触发器)的示例持久编排,并仅并行成功执行了一次

OperationalFunction

  • 通过在
    host.json
    中配置以下设置,我们可以确保
    OperationalFunction
    在所有触发器中一次仅运行一个实例。
  "durableTask": {
    "maxConcurrentActivityFunctions": 1,
    "maxConcurrentOrchestrations": 1
  }

下面是使用 Blob 触发器和 HTTP 触发器函数进行持久编排的完整代码。

HttpTriggeredStartFunction.cs:

using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Http;

namespace FunctionApp5.Triggers
{
    public class HttpTriggeredStartFunction
    {
        [FunctionName("HttpTriggeredStartFunction")]
        public async Task<IActionResult> RunAsync(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            log.LogInformation("HTTP Triggered start function hit.");
            string requestData = await new StreamReader(req.Body).ReadToEndAsync();
            await starter.StartNewAsync("OrchestrationFunction", null, requestData);
            return new OkObjectResult("HTTP orchestration started successfully.");
        }
    }
}

BlobTriggeredStartFunction.cs:

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;

namespace FunctionApp5.Triggers
{
    public class BlobTriggeredStartFunction
    {
        [FunctionName("BlobTriggeredStartFunction")]
        public async Task RunAsync(
            [BlobTrigger("kamcontainer/{name}", Connection = "AzureWebJobsStorage")] Stream blobStream,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            log.LogInformation($"Blob trigger received for: {blobStream}");
            await starter.StartNewAsync("OrchestrationFunction", null, "blob-data");
        }
    }
}

操作函数.cs:

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

namespace FunctionApp5.Activities
{
    public class OperationalFunction
    {
        [FunctionName("OperationalFunction")]
        public async Task<string> RunAsync([ActivityTrigger] string input, ILogger log)
        {
            log.LogInformation($"OperationalFunction execution started with input: {input}");
            await Task.Delay(5000);
            log.LogInformation("OperationalFunction execution finished.");
            return "Success";
        }
    }
}

OrchestrationFunction.cs:

using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;

namespace FunctionApp5.Orchestrator
{
    public class OrchestrationFunction
    {
        [FunctionName("OrchestrationFunction")]
        public async Task OrchestrationFunctionAsync(
            [OrchestrationTrigger] IDurableOrchestrationContext context,
            ILogger log)
        {
            log.LogInformation("Starting orchestration.");
            var input = context.GetInput<string>();
            await context.CallActivityAsync<string>("OperationalFunction", input);
            log.LogInformation("Operational Function finished execution.");
        }
    }
}

输出:

Durable Orchestration 功能执行成功,Blob 触发功能的日志如下所示。

enter image description here

我成功使用Postman向HTTP触发函数发送了

POST
请求,如下所示。

enter image description here

发布请求后我得到了以下功能日志。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.