Azure 耐用功能卡住了

问题描述 投票:0回答:1

我在.net6中创建了一个azure持久函数,这里是它的代码片段

我面临两个问题

  1. 尽管所有活动函数执行都毫无例外地完成,但编排器函数仍无法完成
  2. 有时活动会执行多次

观察控制台日志,发现 log.LogInformation("GenerateScheduledReports: 所有报告生成任务已完成。");从未执行过,也没有执行过它的异常块。

代码:

[FunctionName("TimerTrigger")]
public static async Task TimerTrigger(
    [TimerTrigger("%ReportSchedule%", RunOnStartup = true)] TimerInfo timer,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = "OrchestratorFunction";

    // Check the status of the running orchestration
    var existingInstance = await starter.GetStatusAsync(instanceId);

    if (existingInstance != null && existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        TimeSpan runningTime = DateTime.Now - existingInstance.CreatedTime;

        if (runningTime.TotalMinutes > 30)
        {
            log.LogWarning($"Orchestrator with ID = '{instanceId}' is running for {runningTime.TotalMinutes} minutes. Terminating...");
            await starter.TerminateAsync(instanceId, "Orchestration running too long. Terminated.");
            log.LogInformation($"Orchestrator with ID = '{instanceId}' has been terminated.");
        }
        else
        {
            log.LogInformation($"Orchestrator with ID = '{instanceId}' is running for {runningTime.TotalMinutes} minutes. Letting it continue.");
            return; // Skip starting a new instance
        }
    }

    log.LogInformation("No active or acceptable orchestrator instance found. Starting a new one...");
    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    await starter.StartNewAsync("OrchestratorFunction", instanceId);
}

[FunctionName("OrchestratorFunction")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
    ILogger log)
{
    try
    {
        if (!context.IsReplaying)
        {
            log.LogInformation("OrchestratorFunction: START");
            
            await GenerateScheduledReports(context, log);

            log.LogInformation("OrchestratorFunction: END");
        }
    }
    catch (Exception ex)
    {
        log.LogError($"OrchestratorFunction: EXCEPTION : {ex.ToString()}");
    }
}

private static async Task GenerateScheduledReports(IDurableOrchestrationContext context, ILogger log)
{
    log.LogInformation("GenerateScheduledReports: START");
    List<Task> _listTasks = new List<Task>();
    var reportGenerationList = ReportSettingService.GetClientListForReportGeneration(log);

    foreach (var reportGeneration in reportGenerationList)
    {
        if (reportGeneration.ReportType == Constant.RECON_REPORT_TYPE_STRING)
        {
            _listTasks.Add(context.CallActivityAsync("GenerateReconciliationReport", reportGeneration));
        }
        else if (reportGeneration.ReportType == Constant.QTRREBATE_REPORT_TYPE_STRING)
        {
            _listTasks.Add(context.CallActivityAsync("GenerateQuarterlyRebateReport", reportGeneration));
        }
    }
    try
    {
        if (_listTasks.Any())
        {
            log.LogInformation("GenerateScheduledReports: Waiting for all report generation tasks to complete...");
            await Task.WhenAll(_listTasks);
            log.LogInformation("GenerateScheduledReports: All report generation tasks completed.");
        }
        else
        {
            log.LogWarning("GenerateScheduledReports: No tasks to process in GenerateReports.");
        }
    }
    catch (Exception ex)
    {
        log.LogError($"GenerateScheduledReports: EXCEPTION: {ex.Message}");
        //throw; // Re-throw to ensure orchestration logs the failure
    }
    log.LogInformation("GenerateScheduledReports: END");
}

#region -- Common activity functions used in scheduled and retry orchestrator functions --
[FunctionName("GenerateReconReport")]
public static async Task GenerateReconReport([ActivityTrigger] ReportGenerationModel reportGeneration, ExecutionContext executionContext, ILogger log)
{
    log.LogInformation($"Client [{reportGeneration.ClientId}-{reportGeneration.ReportType}] Step 1: Start Date {reportGeneration.InvoiceStartDate.GetValueOrDefault().ToString("MM/dd/yyyy")}, End Date {reportGeneration.InvoiceEndDate.GetValueOrDefault().ToString("MM/dd/yyyy")}");
    var reconReport = new ReconReport();
    await reconReport.GenerateReport(reportGeneration,
        executionContext,
        log);
}

[FunctionName("GenerateQuarterlyReport")]
public static async Task GenerateQuarterlyRebateReport([ActivityTrigger] ReportGenerationModel reportGeneration, ExecutionContext executionContext, ILogger log)
{
    log.LogInformation($"Client [{reportGeneration.ClientId}-{reportGeneration.ReportType}] Step 1: - Start Date {reportGeneration.InvoiceStartDate.GetValueOrDefault().ToString("MM/dd/yyyy")}, End Date {reportGeneration.InvoiceEndDate.GetValueOrDefault().ToString("MM/dd/yyyy")}");

    var quarterlyReport = new QuarterlyRebateStatementGenerateReport();
    await quarterlyReport.GenerateReport(reportGeneration,
                    executionContext,
                    log);
}
#endregion

主机.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      },
      "enableLiveMetricsFilters": true
    }
  },
  "durableTask": {
    "maxConcurrentActivityFunctions": 30, // Concurrency not relevant for sequential execution
    "maxConcurrentOrchestratorFunctions": 1, // Only one orchestrator runs at a time
    "controlQueueBatchSize": 50,
    "taskHub": "DurableTaskHub",
    "storageProvider": {
      "connectionStringName": "BlobConnectionString", // Use the correct key name here
      "maxQueuePollingIntervalMs": 2000
    }
  },
  "functionTimeout": "02:00:00",
  "extensions": {
    "durableTask": {
      "hubName": "MyHubName"
    }
  }
}
c# azure-functions azure-durable-functions
1个回答
0
投票

不要这样做:

    try
    {
        if (!context.IsReplaying)
        {
            await GenerateScheduledReports(context, log);
        }
    }

你应该只调用该方法:

    try
    {
        await GenerateScheduledReports(context, log);
    }

持久功能的一个重要部分是编排器必须是“确定性的”。 这意味着重播时它必须在每次运行时执行相同的 CallActivityAsync 调用。 你的 GenerateScheduledReports 方法看起来不错。 但是,如果您不希望由于重播而出现重复的日志消息,请在上下文中使用

CreateReplaySafeLogger
方法来包装您的记录器,以便它仅在我们不重播时记录每条消息。
请注意,它不会运行该活动两次。 Durable Functions 会跳过活动调用,仅使用重播时活动的结果来解析任务。

您的代码不起作用的原因是在安排活动后,协调器退出。 然后,一旦活动完成,它就会重播协调器并继续进行。 但是因为方法调用周围有一个 if 块,所以它永远不会在重播时出现。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.