Elasticsearch 正在以非连续顺序处理来自 Logstash 的传入事件/消息

问题描述 投票:0回答:1

我们有一个将数据从 MongoDB 同步到 Elasticsearch 的系统。以下是关键组件:

  1. MongoDB 源连接器(Kafka 连接器):该组件从 MongoDB oplog 读取事件并生成有关 Kafka 主题的消息。
  2. Logstash:Logstash 使用来自 Kafka 的这些消息并将它们发送到 Elasticsearch。我们为 Logstash 配置了特定设置,包括 pipeline.workers: 1 和 pipeline.ordered: true,以确保事件按接收顺序进行处理。
  3. ElasticSearch:1 个节点集群,索引具有单个主分片。
正在使用的 ELK 堆栈的

版本8.7.1

问题: 我们有一个用例,其中以下操作在 mongoDB 中按顺序执行:

  1. 创建文档A
  2. 文件A更新
  3. 删除文档A

虽然我们可以看到这些操作在 Logstash 中按顺序处理,但我们遇到了一个问题:文档 A 的删除没有反映在 Elasticsearch 中。 Elasticsearch 文档中的

_version
为 3,表示所有 3 个事件均已执行。这表明删除操作可能会在更新操作之前处理,导致文档 A 在 MongoDB 中被删除后仍保留在 Elasticsearch 中。

logstash管道如下:

 input {
    kafka {
        id => "my_plugin_id"
        group_id => "logstash"
        bootstrap_servers => "broker:29092"
        topics => ["topic"]
        auto_offset_reset => "earliest"
        consumer_threads => 1
    }
}

filter {
    json {
        source => "message"
        target => "message"
        add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
    }
}


output {
    if [message][operationType] == "delete" {
        elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "delete"
        }
    }
    else {
        elasticsearch {
        hosts => "http://es01:9200"
        index => "index_name"
        document_id => "%{[mongoId]}"
        user => "elastic"
        password => "changeme"
        pipeline => "index_pipeline"
        }
    }
}

注意:如上面的配置中所述,对于delete以外的其他操作,我们使用一个摄取管道来重构要索引的文档数据。

document_id
设置为mongoId。

一个假设,但有待验证: 据我所知,Elasticsearch 输出插件(在 Logstash 管道中使用)使用批量 API 将数据发送到 Elastic Search。在单个批次中处理的事件(子请求)是否可能不遵守严格的顺序,即删除可能在更新事件之前运行(或完成),因此弹性搜索中可见的最终文档对应于更新操作。 刷新索引设置为默认即 1 次/秒 另外,我将

pipeline.batch.delay
(logstash 管道配置)增加到 5000 毫秒(默认为 50 毫秒),以确保所有事件都在同一批次中。

elasticsearch logstash
1个回答
0
投票

将 pipeline.workers 设置为 1 并将ordered 设置为 true 可确保您的 Logstash 管道逐一处理文档。事件也一一到达输出层。

但是,由于您有两个不同的

elasticsearch
输出插件,因此两者都独立运行,并且每个插件都可以在任意时刻发送其批次,具体取决于缓冲区已填充的量或自上次发送以来的时间。

因此,在您的情况下可能会发生的情况是,首先发送删除批次,但不执行任何操作,因为没有具有该 ID 的文档,然后接下来发送创建/更新批次,从而有效地创建文档。

您可能应该做的是将过滤器部分内的临时 action 字段(例如在

[@metadata][action]
中)的值设置为
index
update
delete
,并在单个
elasticsearch 中使用该值
输出,像这样:

output {
    elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "%{[@metadata][action]}"
        pipeline => "index_pipeline"
    }
}

无论如何,您的管道不会对删除产生任何影响,因此那里应该不会有任何问题,或者如果有问题,请从输出中删除

pipeline
设置,并将其直接在索引设置中设置为
index.default_pipeline
财产。

尝试一下。

© www.soinside.com 2019 - 2024. All rights reserved.