Azure Function Orchestration触发并行问题

问题描述 投票:0回答:1

我正在构建一个具有四种不同功能的Azure Orchestration Function App。第一个函数是启动任务函数,用于删除 Cosmos DB 数据库内的容器。我遇到问题的第二个函数扫描由多个链接组成的 Cosmos DB 容器并提取一些链接。然后,它将链接保存在新的 Cosmos DB 容器中。

在编排函数中,我使用一个 for 循环,使用不同的参数运行该函数 4 次,实际上,我将有大约 9 次或更多:我希望应用程序接受 A、B、C、D 并完成所有操作。现在又来回了。

谢谢您的帮助

这是我的编排功能:

[Function(nameof(CosmosDBFunction))]
public async Task<string> RunOrchestrator(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    ILogger logger = context.CreateReplaySafeLogger(nameof(CosmosDBFunction));
    logger.LogInformation("----Function Started-----");

    try
    {
        await context.CallActivityAsync("StartupTask");
    }
    catch (Exception ex)
    {
        logger.LogError($"Error happened StartupTask: {ex.Message}");
    }

    var parallelTasks = new List<Task<string>>();
    var sites = new List<string>() {
        "A", "B", "C", "D"
    };

    for (int i = 0; i < sites.Count; i++)
    {
        try
        {
            var task = context.CallActivityAsync<string>("ScanSafetoAContainer", sites[i]);
            parallelTasks.Add(task);
        }
        catch (TaskFailedException ex) when (ex.InnerException is CosmosException cosmosException)
        {
            _logger.LogError($"Error calling for site {sites[i]}: {ex.Message}");
        }
        catch (TaskFailedException ex)
        {
            _logger.LogError($"Error calling {sites[i]}: {ex.Message}");
        }
    }

    await Task.WhenAll(parallelTasks);

    context.CallActivityAsync("SaveToAContainer");

    try
    {
        logger.LogInformation("----Function B Started-----");
        await context.CallActivityAsync("SaveToXContainer");
    }
    catch (TaskFailedException ex)
    {
        logger.LogError($"Error happened : {ex.Message}");
    }

    try
    {
        logger.LogInformation("----Function clinksToBlinks Started-----");
        await context.CallActivityAsync("SaveToYContainer");
    }
    catch (TaskFailedException ex) 
    { 
        logger.LogError($"Error happened: {ex.Message}");
    }

    logger.LogInformation("----Function Finished-----");

    try
    {
        logger.LogInformation($"Operation is completed");
        return "Operation is completed";
    }
    catch (Exception ex)
    {
        return $"Operation is completed: exception at the end line {ex.Message}";
    }
}

这是循环内调用的函数:

[Function(nameof(Something))]
[CosmosDBOutput(databaseName: "DB", containerName: "AB",
      Connection = "CosmosConnection"
     , CreateIfNotExists = true, PartitionKey = "/something")]
public async Task<string> SaveAToContainer
    ([ActivityTrigger] string site,
     FunctionContext executionContext)
{
    ConcurrentBag<Object> links = new ConcurrentBag<Object>();
    ILogger _logger = executionContext
                           .GetLogger(nameof(Something));

    var container = await CreateContainer();
    var client = _httpClientFactory;
    client.Timeout = TimeSpan.FromSeconds(60);

    using FeedIterator<object> object = container.GetItemLinqQueryable<Page>()
        .Where(s => s.prop == prop)
        .ToFeedIterator<Object>();

    while (pages.HasMoreResults)
    {
        long counter = 0;

        foreach (var page in await pages.ReadNextAsync())
        {
            string href = string.Empty;

            try
            {
                HttpResponseMessage responseFromUrl = await client.GetAsync(page.Url);
                var htmlContent = await responseFromUrl.Content.ReadAsStringAsync();
                var doc = new HtmlDocument();
                doc.LoadHtml(htmlContent);

                HtmlNode divNode = doc.DocumentNode.SelectSingleNode("//div[@id='ga-maincontent']");
                HtmlNodeCollection anchorTags = doc.DocumentNode
               .SelectNodes("//div[@id='ga-maincontent']//a[@href]");

                if (anchorTags != null)
                {
                    foreach (HtmlNode anchorTag in anchorTags)
                    {
                        href = anchorTag.GetAttributeValue("href", string.Empty);

                        if (href.StartsWith("http"))
                        {
                            long uniqueId = Interlocked.Increment(ref counter);
                            links.Add(new Link
                            {
                                Id = $"{Guid.NewGuid().ToString()}{Interlocked.Increment(ref counter)}",
                                 PropA= something
                            });
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                links.Add(new Link
                {
                    Id = $"{Guid.NewGuid().ToString()}{Interlocked.Increment(ref counter)}",
                    PropA= something
                });

                _logger.LogInformation($"Http request exception :" +
                    $" Message :{ex.Message} | Status Code {ex.Source} | {ex.Source} ");
            }

            //if (i >= pages.) break;
        }
    }

    string scannedLinks = string.Empty;

    try
    {
        ss = JsonConvert.SerializeObject(links);
    }
    catch (Exception ex)
    {
        _logger.LogInformation($"Parallel Func Insert Links : {ex.Message} ");
    }

    return ss;
} 

我曾经使用列表而不是

ConcurrentBag
,但后来我认为这不是线程安全的,为什么应用程序没有按照我的预期执行。但是,我觉得我很好地使用了 async 和 wait 关键字来确保应用程序运行并等待任务完成。

我真正想做的是:当我循环执行任务时:A、B、C和D将完成并全部准备好保存到cosmos db,然后我们进入第三个函数。

非常感谢任何有关我做错的事情的建议。

c# .net async-await azure-functions azure-durable-functions
1个回答
-1
投票

要解决 Azure Function Orchestration 中的并行执行问题,请确保每项任务在开始下一组任务之前完成。以下是修改编排功能的方法:

  1. 使用 Task.WhenAll 并行运行多个任务并等待所有任务完成。
  2. 避免在没有适当处理的情况下混合 async/await 和 Task。 这是修改后的方法:

csharp

await Task.WhenAll(parallelTasks);

// Ensure all tasks are completed before moving on
await context.CallActivityAsync("SaveToAContainer");
await context.CallActivityAsync("SaveToXContainer");
await context.CallActivityAsync("SaveToYContainer");
© www.soinside.com 2019 - 2024. All rights reserved.