我正在使用 Logstash 配置解析不同负载平衡服务器集群的多个日志文件,并希望向每个文件的条目添加一个“log_origin”字段,以便以后轻松过滤。
这是我的输入->文件配置的一个简单示例:
input {
file {
type => "node1"
path => "C:/Development/node1/log/*"
add_field => [ "log_origin", "live_logs" ]
}
file {
type => "node2"
path => "C:/Development/node2/log/*"
add_field => [ "log_origin", "live_logs" ]
}
file {
type => "node3"
path => "C:/Development/node1/log/*"
add_field => [ "log_origin", "live_logs" ]
}
file {
type => "node4"
path => "C:/Development/node1/log/*"
add_field => [ "log_origin", "live_logs" ]
}
}
filter {
grok {
match => [
"message","%{DATESTAMP:log_timestamp}%{SPACE}\[%{DATA:class}\]%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}%{GREEDYDATA:log_message}"
]
}
date {
match => [ "log_timestamp", "dd.MM.YY HH:mm:ss", "ISO8601" ]
target => "@timestamp"
}
mutate {
lowercase => ["loglevel"]
strip => ["loglevel"]
}
if "_grokparsefailure" in [tags] {
multiline {
pattern => ".*"
what => "previous"
}
}
if[fields.log_origin] == "live_logs"{
if [type] == "node1" {
mutate {
add_tag => "realsServerName1"
}
}
if [type] == "node2" {
mutate {
add_tag => "realsServerName2"
}
}
if [type] == "node3" {
mutate {
add_tag => "realsServerName3"
}
}
if [type] == "node4" {
mutate {
add_tag => "realsServerName4"
}
}
}
}
output {
stdout { }
elasticsearch { embedded => true }
}
我本来希望logstash 添加这个字段,并为其找到的每个logentry 赋予值,但事实并非如此。也许我在这里完全采取了错误的方法?
编辑:我无法直接从节点检索日志,但必须将它们复制到我的“服务器”。否则我将能够仅使用文件路径来区分不同的集群...
编辑:它正在工作。我应该在两者之间清理我的数据。没有添加字段的旧条目使我的结果变得混乱。
add_field 需要一个哈希值。应该是
add_field => {
"log_origin" => "live_logs"
}
我使用logstash 8.13版本,下面的配置对我有用
logstash.conf
input {
file {
mode => "read"
exit_after_read => true # this tells logstash to exit after reading the file. This is useful for running logstash as a "job". if you want logstash to continue to run and monitor for files, remove this line.
file_completed_action => "log" # this tells logstash to log to the file specified in file_completed_log_path once its done reading the input file.
file_completed_log_path => "/usr/share/logstash/ingest_data/logstash_completed.log"
path => "/usr/share/logstash/ingest_data/*.log"
add_field => {
"name" => "log-error"
}
}
}
filter {
if [name] == "log-error" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level}" }
add_field => [ "received_at", "%{@timestamp}" ]
}
date {
match => [ "log_timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
}
}
}
output {
if [name] == "log-error" {
elasticsearch {
index => "log-error-%{+YYYY.MM.dd}"
hosts=> "${ELASTIC_HOSTS}"
user=> "${ELASTIC_USER}"
password=> "${ELASTIC_PASSWORD}"
cacert=> "certs/ca/ca.crt"
}
}
}