我正在构建一个具有四种不同功能的Azure Orchestration Function App。第一个函数是启动任务函数,用于删除 Cosmos DB 数据库内的容器。我遇到问题的第二个函数扫描由多个链接组成的 Cosmos DB 容器并提取一些链接。然后,它将链接保存在新的 Cosmos DB 容器中。
在编排函数中,我使用一个 for 循环,使用不同的参数运行该函数 4 次,实际上,我将有大约 9 次或更多:我希望应用程序接受 A、B、C、D 并完成所有操作。现在又来回了。
谢谢您的帮助
这是我的编排功能:
[Function(nameof(CosmosDBFunction))]
public async Task<string> RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(CosmosDBFunction));
logger.LogInformation("----Function Started-----");
try
{
await context.CallActivityAsync("StartupTask");
}
catch (Exception ex)
{
logger.LogError($"Error happened StartupTask: {ex.Message}");
}
var parallelTasks = new List<Task<string>>();
var sites = new List<string>() {
"A", "B", "C", "D"
};
for (int i = 0; i < sites.Count; i++)
{
try
{
var task = context.CallActivityAsync<string>("ScanSafetoAContainer", sites[i]);
parallelTasks.Add(task);
}
catch (TaskFailedException ex) when (ex.InnerException is CosmosException cosmosException)
{
_logger.LogError($"Error calling for site {sites[i]}: {ex.Message}");
}
catch (TaskFailedException ex)
{
_logger.LogError($"Error calling {sites[i]}: {ex.Message}");
}
}
await Task.WhenAll(parallelTasks);
context.CallActivityAsync("SaveToAContainer");
try
{
logger.LogInformation("----Function B Started-----");
await context.CallActivityAsync("SaveToXContainer");
}
catch (TaskFailedException ex)
{
logger.LogError($"Error happened : {ex.Message}");
}
try
{
logger.LogInformation("----Function clinksToBlinks Started-----");
await context.CallActivityAsync("SaveToYContainer");
}
catch (TaskFailedException ex)
{
logger.LogError($"Error happened: {ex.Message}");
}
logger.LogInformation("----Function Finished-----");
try
{
logger.LogInformation($"Operation is completed");
return "Operation is completed";
}
catch (Exception ex)
{
return $"Operation is completed: exception at the end line {ex.Message}";
}
}
这是循环内调用的函数:
[Function(nameof(Something))]
[CosmosDBOutput(databaseName: "DB", containerName: "AB",
Connection = "CosmosConnection"
, CreateIfNotExists = true, PartitionKey = "/something")]
public async Task<string> SaveAToContainer
([ActivityTrigger] string site,
FunctionContext executionContext)
{
ConcurrentBag<Object> links = new ConcurrentBag<Object>();
ILogger _logger = executionContext
.GetLogger(nameof(Something));
var container = await CreateContainer();
var client = _httpClientFactory;
client.Timeout = TimeSpan.FromSeconds(60);
using FeedIterator<object> object = container.GetItemLinqQueryable<Page>()
.Where(s => s.prop == prop)
.ToFeedIterator<Object>();
while (pages.HasMoreResults)
{
long counter = 0;
foreach (var page in await pages.ReadNextAsync())
{
string href = string.Empty;
try
{
HttpResponseMessage responseFromUrl = await client.GetAsync(page.Url);
var htmlContent = await responseFromUrl.Content.ReadAsStringAsync();
var doc = new HtmlDocument();
doc.LoadHtml(htmlContent);
HtmlNode divNode = doc.DocumentNode.SelectSingleNode("//div[@id='ga-maincontent']");
HtmlNodeCollection anchorTags = doc.DocumentNode
.SelectNodes("//div[@id='ga-maincontent']//a[@href]");
if (anchorTags != null)
{
foreach (HtmlNode anchorTag in anchorTags)
{
href = anchorTag.GetAttributeValue("href", string.Empty);
if (href.StartsWith("http"))
{
long uniqueId = Interlocked.Increment(ref counter);
links.Add(new Link
{
Id = $"{Guid.NewGuid().ToString()}{Interlocked.Increment(ref counter)}",
PropA= something
});
}
}
}
}
catch (Exception ex)
{
links.Add(new Link
{
Id = $"{Guid.NewGuid().ToString()}{Interlocked.Increment(ref counter)}",
PropA= something
});
_logger.LogInformation($"Http request exception :" +
$" Message :{ex.Message} | Status Code {ex.Source} | {ex.Source} ");
}
//if (i >= pages.) break;
}
}
string scannedLinks = string.Empty;
try
{
ss = JsonConvert.SerializeObject(links);
}
catch (Exception ex)
{
_logger.LogInformation($"Parallel Func Insert Links : {ex.Message} ");
}
return ss;
}
我曾经使用列表而不是
ConcurrentBag
,但后来我认为这不是线程安全的,为什么应用程序没有按照我的预期执行。但是,我觉得我很好地使用了 async 和 wait 关键字来确保应用程序运行并等待任务完成。
我真正想做的是:当我循环执行任务时:A、B、C和D将完成并全部准备好保存到cosmos db,然后我们进入第三个函数。
非常感谢任何有关我做错的事情的建议。
要解决 Azure Function Orchestration 中的并行执行问题,请确保每项任务在开始下一组任务之前完成。以下是修改编排功能的方法:
csharp
await Task.WhenAll(parallelTasks);
// Ensure all tasks are completed before moving on
await context.CallActivityAsync("SaveToAContainer");
await context.CallActivityAsync("SaveToXContainer");
await context.CallActivityAsync("SaveToYContainer");