我们有一个将数据从 MongoDB 同步到 Elasticsearch 的系统。以下是关键组件:
版本:8.7.1
问题: 我们有一个用例,其中以下操作在 mongoDB 中按顺序执行:
虽然我们可以看到这些操作在 Logstash 中按顺序处理,但我们遇到了一个问题:文档 A 的删除没有反映在 Elasticsearch 中。 Elasticsearch 文档中的
_version
为 3,表示所有 3 个事件均已执行。这表明删除操作可能会在更新操作之前处理,导致文档 A 在 MongoDB 中被删除后仍保留在 Elasticsearch 中。
logstash管道如下:
input {
kafka {
id => "my_plugin_id"
group_id => "logstash"
bootstrap_servers => "broker:29092"
topics => ["topic"]
auto_offset_reset => "earliest"
consumer_threads => 1
}
}
filter {
json {
source => "message"
target => "message"
add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
}
}
output {
if [message][operationType] == "delete" {
elasticsearch {
hosts => "http://es01:9200"
user => "elastic"
password => "changeme"
index => "index_name"
document_id => "%{[mongoId]}"
action => "delete"
}
}
else {
elasticsearch {
hosts => "http://es01:9200"
index => "index_name"
document_id => "%{[mongoId]}"
user => "elastic"
password => "changeme"
pipeline => "index_pipeline"
}
}
}
注意:如上面的配置中所述,对于delete以外的其他操作,我们使用一个摄取管道来重构要索引的文档数据。
document_id
设置为mongoId。
一个假设,但有待验证: 据我所知,Elasticsearch 输出插件(在 Logstash 管道中使用)使用批量 API 将数据发送到 Elastic Search。在单个批次中处理的事件(子请求)是否可能不遵守严格的顺序,即删除可能在更新事件之前运行(或完成),因此弹性搜索中可见的最终文档对应于更新操作。 刷新索引设置为默认即 1 次/秒 另外,我将
pipeline.batch.delay
(logstash 管道配置)增加到 5000 毫秒(默认为 50 毫秒),以确保所有事件都在同一批次中。
将 pipeline.workers 设置为 1 并将ordered 设置为 true 可确保您的 Logstash 管道逐一处理文档。事件也一一到达输出层。
但是,由于您有两个不同的
elasticsearch
输出插件,因此两者都独立运行,并且每个插件都可以在任意时刻发送其批次,具体取决于缓冲区已填充的量或自上次发送以来的时间。
因此,在您的情况下可能会发生的情况是,首先发送删除批次,但不执行任何操作,因为没有具有该 ID 的文档,然后接下来发送创建/更新批次,从而有效地创建文档。
您可能应该做的是将过滤器部分内的临时 action 字段(例如在
[@metadata][action]
中)的值设置为 index
、update
或 delete
,并在单个 elasticsearch
中使用该值
输出,像这样:
output {
elasticsearch {
hosts => "http://es01:9200"
user => "elastic"
password => "changeme"
index => "index_name"
document_id => "%{[mongoId]}"
action => "%{[@metadata][action]}"
pipeline => "index_pipeline"
}
}
无论如何,您的管道不会对删除产生任何影响,因此那里应该不会有任何问题,或者如果有问题,请从输出中删除
pipeline
设置,并将其直接在索引设置中设置为 index.default_pipeline
财产。
尝试一下。