当将少量数据摄入到天蓝色数据浏览器时,避免webjob等待

问题描述 投票:0回答:2

我有一个网络作业,它从azure事件中心接收网站点击事件,然后将这些事件吸收到ADX中。

public static async Task Run([EventHubTrigger] EventData[] events, ILogger logger)
{
    // Process events
    try
    {
        var ingestResult = await _adxIngester.IngestAsync(events);
        if (!ingestResult) 
        {
            AppInsightLogError();
            logger.LogError();
        }

    }
    catch(Exception ex)
    {
        AppInsighLogError();
        logger.LogError()
    }
}

我在使用ADX进行摄取时使用了queue ingestion,然后将其关闭了FlushImmediately,从而可以批量摄取。如果事件不符合默认的IngestionBatch策略(1000个事件/ 1GB数据大小),则ADX等待5 minutes直到返回Success状态,这使Run也等待该时间。

public async Task<bool> IngestAsync(...) 
{
    IKustoQueuedIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionString);

    var kustoIngestionProperties = new KustoQueuedIngestionProperties(databaseName: "myDB", tableName: "events")
    {
        ReportLevel = IngestionReportLevel.FailuresOnly,
        ReportMethod = IngestionReportMethod.Table,
        FlushImmediately = false
    };

    var streamIdentifier = Guid.NewGuid();
    var clientResult = await client.IngestFromStreamAsync(...);
    var ingestionStatus = clientResult.GetIngestionStatusBySourceId(streamIdentifier);

    while (ingestionStatus.Status == Status.Pending)
    {
        await Task.Delay(TimeSpan.FromSeconds(15));
        ingestionStatus = clientResult.GetIngestionStatusBySourceId(streamIdentifier);
    }

    if (ingestionStatus.Status == Status.Failed) 
    {
        return false;
    }

    return true;
}

由于我不想让我的网络作业在没有太多事件发生或只是QA起作用的情况下等待那么长时间,所以进行了以下更改:

  • 不要在await上使用IngestAsync,因此使Run成为同步方法
  • 将参数Action onError添加到IngestAsync,并在提取任务失败时调用它。在AppInsightLogError()中调用logger.LogError()onError,而不是返回false
  • IngestFromStreamAsync替换IngestFromStream

[基本上,我想确保事件到达Azure Queue并引发异常(如果有),然后再轮询获取状态,然后退出Run方法,并且我不必等待状态轮询,如果发生任何故障则这将是日志。

我的问题是:

  • 避免Webjob等待几分钟是一种好习惯吗?如果没有,为什么?
  • 如果是,我的解决方案是否足以解决此问题?否则如何我应该这样做吗?
azure-webjobs azure-eventhub kusto azure-data-explorer
2个回答
0
投票

摄取过程分几个阶段进行。一个阶段在客户端完成,而一个阶段在服务器端完成:

  1. 您正在使用的摄取客户端代码将获取您的流并将其上传到Blob,然后它将消息发送到队列。在该阶段抛出的任何异常的确会传播到您的代码中,这就是为什么您还应该使用try-catch块的原因,在catch块中,您可以按照建议记录错误消息。您可以将IngestFromStreamAsync与await关键字一起使用,也可以使用IngestFromStream。如果您想释放辅助线程并节省资源,则第一种选择更好。但是,在这两者之间进行选择与轮询没有任何关系。轮询与第二阶段有关。
  2. Kusto的DataManagement组件一直在侦听队列中的消息,因此,当它到达您的新消息时,它将立即读取并查看有关新提取请求的一些元数据信息,例如存储数据的Blob URI以及应该更新失败/进度的Azure表。该阶段由服务器端远程完成,您可以选择等待客户端代码中的每一次摄取,然后轮询直到服务器完成摄取过程。如果在此阶段中有任何异常,那么当然不会将它们传播到您的客户端代码,但是您将能够检查Azure表并查看发生了什么。您还可以决定推迟该状态检查,并在其他任务中完成它。

0
投票
  • [IngestFromStreamAsync”将您的数据上传到Blob并将消息发布到“数据管理”输入队列。它不会等待聚合时间,最终状态为Queued
  • [FlushImmediately默认为false。
  • 如果没有任何其他处理,请考虑使用Event Hub to Kusto connection
© www.soinside.com 2019 - 2024. All rights reserved.