ElasticSearch Nest BulkAll 在收到无法从 _bulk 重试的失败后停止

问题描述 投票:0回答:4

使用

BulkAll()
批量插入我收到这个奇怪的错误

BulkAll halted after receiving failures that can not be retried from _bulk

但是,当我检查异常时,我仍然得到成功的响应:

Successful low level call on POST: /cf-lblogs-2019.01.23/cloudflareloadbalancinglogelasticentity/_bulk?

我在这里做错了什么?下面是代码片段:

var waitHandle = new CountdownEvent(1);

var bulk = _client.BulkAll(group.ToList(), a => a
                .Index(_index.Replace("*", string.Empty) + group.Key)
                .BackOffRetries(2)
                .BackOffTime("30s")
                .RefreshOnCompleted(true)
                .MaxDegreeOfParallelism(4)
                .Size(group.Count()));

bulk.Subscribe(new BulkAllObserver(
                onNext: response => _logger.LogInformation($"Indexed {response.Page * group.Count()} with {response.Retries} retries"),
                onError: HandleInsertError,
                onCompleted: () => waitHandle.Signal()
            ));

waitHandle.Wait();


private void HandleInsertError(Exception e)
    {
        var exceptionString = e.ToString(); 
        _logger.LogError(exceptionString);
    }

巢6.4.2.

弹性6.5.4.

c# elasticsearch nest
4个回答
6
投票

就我而言,我已经解决了,如下所示:

        List<string> errors = new List<string>();
        int seenPages = 0;
        int requests = 0;
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        ConcurrentBag<BulkResponse> bulkResponses = new ConcurrentBag<BulkResponse>();
        ConcurrentBag<BulkAllResponse> bulkAllResponses = new ConcurrentBag<BulkAllResponse>();
        ConcurrentBag<items> deadLetterQueue = new ConcurrentBag<items>();
        BulkAllObservable<items> observableBulk = elasticClient.BulkAll(lst, f => f
                .MaxDegreeOfParallelism(Environment.ProcessorCount)
                .BulkResponseCallback(r =>
                {
                    bulkResponses.Add(r);
                    Interlocked.Increment(ref requests);
                })
                .ContinueAfterDroppedDocuments()
                .DroppedDocumentCallback((r, o) =>
                {
                    errors.Add(r.Error.Reason);
                    deadLetterQueue.Add(o);
                })
                .BackOffTime(TimeSpan.FromSeconds(5))
                .BackOffRetries(2)
                .Size(1000)
                .RefreshOnCompleted()
                .Index(indeksName)
                .BufferToBulk((r, buffer) => r.IndexMany(buffer))
            , tokenSource.Token);

        try
        {
            observableBulk.Wait(TimeSpan.FromMinutes(15), b =>
            {
                bulkAllResponses.Add(b);
                Interlocked.Increment(ref seenPages);
            });
        }
        catch (Exception e)
        {
            Console.WriteLine("Exxx => " + e.Message);
        }
        foreach (var err in errors)
        {
            Console.WriteLine("Error : " + err);
        }

我希望它可以帮助其他也有这个问题的人。


5
投票

这意味着

BulkAll
observable 无法索引一个或多个由于无法重试的原因而失败的文档。

默认情况下,无法索引的文档的重试谓词是当某个项目返回 HTTP 响应状态代码 429 时,即尝试同时索引多于集群能够处理的文档。

BulkAll()
设置中可以看出两件事:

  1. var bulk = _client.BulkAll(group.ToList(), a => a

    group.ToList()
    将立即评估所有文档并将它们缓冲在内存中的
    List<T>
    中。为了提高效率,您通常希望在批量索引时“懒惰”地枚举一个大集合。如果 group 是一个可以传递给
    IEnumerable<T>
    BulkAll
    ,那么就传递它。
    
    

  2. .Size(group.Count()));

    
    
    这将尝试在一次批量请求中发送

    所有文档

    BulkAll 的想法是,它将同时发送多个批量请求,并持续这样做,直到所有文档都被索引为止。

    
    
    Size应该针对每个请求设置为合理的大小;您可以通过计算每个文档的平均字节数来计算合理的大小(以字节为单位),然后从小于 5MB 的位置开始,或者您可能希望从每个请求 1000 个文档开始,并评估索引速度是否足以满足您的需求或者如果您开始收到返回的 429 响应。当后者开始发生时,这很好地表明您已接近正在索引的文档的集群的索引限制阈值。


1
投票

如果您已使用类似的方法在 Elastic 搜索中实现 API 安全性

POST /_security/api_key { "name":"my-api-key", "role_descriptors": { "admin": { "cluster":["all"], "index": [ { "names":["my-index", "my-other-index"], "privileges": ["all"] } ] } } }

并且您没有为这些索引使用正确的 ApiKey,或者尝试使用生成的 ApiKey 创建并将bulkAll 放入另一个索引,该索引没有为其定义 ApiKey。

而不是在删除/创建调用期间收到错误..

await client.Indices.DeleteAsync(IndexName); await client.Indices.CreateAsync(IndexName, MutateCreateIndexDescriptor);

您将收到您在问题中指定的错误。

BulkAll halted after receiving failures that can not be retried from _bulk

堆栈跟踪错误地指向:

bulkAll.Wait(... , ...);

如果已定义。

这可能是 NEST 中的一个错误。在版本中体验过。 7.8.0 希望这可以帮助任何偶然发现它的人。


0
投票

.EnableApiVersioningHeader()

在其设置中指定。

添加此行后,错误消失了。

© www.soinside.com 2019 - 2024. All rights reserved.