我有一个网络作业,它从azure事件中心接收网站点击事件,然后将这些事件吸收到ADX中。
public static async Task Run([EventHubTrigger] EventData[] events, ILogger logger)
{
// Process events
try
{
var ingestResult = await _adxIngester.IngestAsync(events);
if (!ingestResult)
{
AppInsightLogError();
logger.LogError();
}
}
catch(Exception ex)
{
AppInsighLogError();
logger.LogError()
}
}
我在使用ADX进行摄取时使用了queue ingestion,然后将其关闭了FlushImmediately
,从而可以批量摄取。如果事件不符合默认的IngestionBatch策略(1000个事件/ 1GB数据大小),则ADX等待5 minutes直到返回Success
状态,这使Run
也等待该时间。
public async Task<bool> IngestAsync(...)
{
IKustoQueuedIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionString);
var kustoIngestionProperties = new KustoQueuedIngestionProperties(databaseName: "myDB", tableName: "events")
{
ReportLevel = IngestionReportLevel.FailuresOnly,
ReportMethod = IngestionReportMethod.Table,
FlushImmediately = false
};
var streamIdentifier = Guid.NewGuid();
var clientResult = await client.IngestFromStreamAsync(...);
var ingestionStatus = clientResult.GetIngestionStatusBySourceId(streamIdentifier);
while (ingestionStatus.Status == Status.Pending)
{
await Task.Delay(TimeSpan.FromSeconds(15));
ingestionStatus = clientResult.GetIngestionStatusBySourceId(streamIdentifier);
}
if (ingestionStatus.Status == Status.Failed)
{
return false;
}
return true;
}
由于我不想让我的网络作业在没有太多事件发生或只是QA起作用的情况下等待那么长时间,所以进行了以下更改:
await
上使用IngestAsync
,因此使Run
成为同步方法Action onError
添加到IngestAsync
,并在提取任务失败时调用它。在AppInsightLogError()
中调用logger.LogError()
和onError
,而不是返回false
IngestFromStreamAsync
替换IngestFromStream
[基本上,我想确保事件到达Azure Queue并引发异常(如果有),然后再轮询获取状态,然后退出Run
方法,并且我不必等待状态轮询,如果发生任何故障则这将是日志。
我的问题是:
摄取过程分几个阶段进行。一个阶段在客户端完成,而一个阶段在服务器端完成:
IngestFromStreamAsync
”将您的数据上传到Blob并将消息发布到“数据管理”输入队列。它不会等待聚合时间,最终状态为Queued。FlushImmediately
默认为false。