文件输入add_field未向每一行添加字段

问题描述 投票:0回答:2

我正在使用 Logstash 配置解析不同负载平衡服务器集群的多个日志文件,并希望向每个文件的条目添加一个“log_origin”字段,以便以后轻松过滤。

这是我的输入->文件配置的一个简单示例:

input {
  file {
    type => "node1"
    path => "C:/Development/node1/log/*"
    add_field => [ "log_origin", "live_logs" ]
  }
  file {
    type => "node2"
    path => "C:/Development/node2/log/*"
    add_field => [ "log_origin", "live_logs" ]
  }
  file {
    type => "node3"
    path => "C:/Development/node1/log/*"
    add_field => [ "log_origin", "live_logs" ]
  }
  file {
    type => "node4"
    path => "C:/Development/node1/log/*"
    add_field => [ "log_origin", "live_logs" ]
  }
}

filter {
    grok {
        match => [
            "message","%{DATESTAMP:log_timestamp}%{SPACE}\[%{DATA:class}\]%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}%{GREEDYDATA:log_message}"
        ]
    }

    date { 
        match => [ "log_timestamp",  "dd.MM.YY HH:mm:ss", "ISO8601" ]
        target => "@timestamp"
    }

    mutate {
        lowercase => ["loglevel"]
        strip     => ["loglevel"]
    }

    if "_grokparsefailure" in [tags] {
        multiline {
            pattern   => ".*"
            what      => "previous"
        }
    }

    if[fields.log_origin] == "live_logs"{
        if [type] == "node1" {
            mutate { 
                add_tag => "realsServerName1"
            }
        }
        if [type] == "node2" {
            mutate { 
                add_tag => "realsServerName2"
            }
        }
        if [type] == "node3" {
            mutate { 
                add_tag => "realsServerName3"
             }
        }
        if [type] == "node4" {
            mutate { 
                add_tag => "realsServerName4"
            }
        }
    }
}

output {
  stdout { }
  elasticsearch { embedded => true }
}

我本来希望logstash 添加这个字段,并为其找到的每个logentry 赋予值,但事实并非如此。也许我在这里完全采取了错误的方法?

编辑:我无法直接从节点检索日志,但必须将它们复制到我的“服务器”。否则我将能够仅使用文件路径来区分不同的集群...

编辑:它正在工作。我应该在两者之间清理我的数据。没有添加字段的旧条目使我的结果变得混乱。

logstash
2个回答
7
投票

add_field 需要一个哈希值。应该是

add_field => {
 "log_origin" => "live_logs" 
}

0
投票

我使用logstash 8.13版本,下面的配置对我有用

logstash.conf

input {
    file {
      mode => "read"
      exit_after_read => true # this tells logstash to exit after reading the file.  This is useful for running logstash as a "job". if you want logstash to continue to run and monitor for files, remove this line.
      file_completed_action => "log" # this tells logstash to log to the file specified in file_completed_log_path once its done reading the input file.
      file_completed_log_path => "/usr/share/logstash/ingest_data/logstash_completed.log"
      path => "/usr/share/logstash/ingest_data/*.log"
      add_field => {
        "name" => "log-error" 
      }
  }
}

filter {
  if [name] == "log-error" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level}" }
      add_field => [ "received_at", "%{@timestamp}" ]
    }
    date {
      match => [ "log_timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
    }
}
}

output {
  if [name] == "log-error" {
    elasticsearch {
      index => "log-error-%{+YYYY.MM.dd}"
      hosts=> "${ELASTIC_HOSTS}"
      user=> "${ELASTIC_USER}"
      password=> "${ELASTIC_PASSWORD}"
      cacert=> "certs/ca/ca.crt"
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.