无法使用 Fluentd 捕获多行日志

问题描述 投票:0回答:1

我正在尝试使用 EFL 创建一个集中式日志系统。为了从 kubernetes 节点收集日志,我将 fluidd 作为守护进程运行。这样我就可以获得单行日志,但无法获得多行日志。

使用配置映射加载 Fluentd.conf。

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config-map
  namespace: kube-system
  uid: ee7f05ea-b6fc-4b11-a0d6-0e6a709bf4c1
  
data:
      fluent.conf: |
          <source>
            @type prometheus
            @id in_prometheus
            bind "0.0.0.0"
            port 24231
            metrics_path "/metrics"
          </source>
          
          <source>
            @type prometheus_output_monitor
            @id in_prometheus_output_monitor
          </source>
          
          <label @FLUENT_LOG>
            <match fluent.**>
              @type null
              @id ignore_fluent_logs
            </match>
          </label>
          
          <source>
            @type tail
            @id in_tail_cluster_autoscaler
            multiline_flush_interval 5s
            path "/var/log/cluster-autoscaler.log"
            pos_file "/var/log/fluentd-cluster-autoscaler.log.pos"
            tag "cluster-autoscaler"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_container_logs
            path "/var/log/containers/*.log"
            pos_file "/var/log/fluentd-containers.log.pos"
            tag "kubernetes.*"
            exclude_path /var/log/containers/fluent*
            read_from_head true
                <parse>
                    @type multi_format
                    <pattern>
                      format multiline
                      format_firstline /^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z ?(stdout|stderr) \w \d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}/
                      format1 /^(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z) (?<stream>stdout|stderr) ((?<logtag>.))? (?<logtime>\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}) (?<log>.*)/
                    </pattern>           
                </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_docker
            path "/var/log/docker.log"
            pos_file "/var/log/fluentd-docker.log.pos"
            tag "docker"
            <parse>
              @type "regexp"
              expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_etcd
            path "/var/log/etcd.log"
            pos_file "/var/log/fluentd-etcd.log.pos"
            tag "etcd"
            <parse>
              @type "none"
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_glbc
            multiline_flush_interval 5s
            path "/var/log/glbc.log"
            pos_file "/var/log/fluentd-glbc.log.pos"
            tag "glbc"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_apiserver_audit
            multiline_flush_interval 5s
            path "/var/log/kubernetes/kube-apiserver-audit.log"
            pos_file "/var/log/kube-apiserver-audit.log.pos"
            tag "kube-apiserver-audit"
            <parse>
              @type "multiline"
              format_firstline "/^\\S+\\s+AUDIT:/"
              format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/
              time_format "%Y-%m-%dT%T.%L%Z"
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_apiserver
            multiline_flush_interval 5s
            path "/var/log/kube-apiserver.log"
            pos_file "/var/log/fluentd-kube-apiserver.log.pos"
            tag "kube-apiserver"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_controller_manager
            multiline_flush_interval 5s
            path "/var/log/kube-controller-manager.log"
            pos_file "/var/log/fluentd-kube-controller-manager.log.pos"
            tag "kube-controller-manager"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_proxy
            multiline_flush_interval 5s
            path "/var/log/kube-proxy.log"
            pos_file "/var/log/fluentd-kube-proxy.log.pos"
            tag "kube-proxy"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_scheduler
            multiline_flush_interval 5s
            path "/var/log/kube-scheduler.log"
            pos_file "/var/log/fluentd-kube-scheduler.log.pos"
            tag "kube-scheduler"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kubelet
            multiline_flush_interval 5s
            path "/var/log/kubelet.log"
            pos_file "/var/log/fluentd-kubelet.log.pos"
            tag "kubelet"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_rescheduler
            multiline_flush_interval 5s
            path "/var/log/rescheduler.log"
            pos_file "/var/log/fluentd-rescheduler.log.pos"
            tag "rescheduler"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_minion
            path "/var/log/salt/minion"
            pos_file "/var/log/fluentd-salt.pos"
            tag "salt"
            <parse>
              @type "regexp"
              expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
              time_format "%Y-%m-%d %H:%M:%S"
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_startupscript
            path "/var/log/startupscript.log"
            pos_file "/var/log/fluentd-startupscript.log.pos"
            tag "startupscript"
            <parse>
              @type "syslog"
              unmatched_lines
            </parse>
          </source>
          
          <filter kubernetes.**>
            @type kubernetes_metadata
            @id filter_kube_metadata
            kubernetes_url "https://x.x.x.x:443/api"
            verify_ssl true
            ca_file ""
            skip_labels false
            skip_container_metadata false
            skip_master_url false
            skip_namespace_metadata false
            watch true
          </filter>
          
          <match **>
            @type elasticsearch
            @id out_es
            @log_level "info"
            include_tag_key true
            host "x.x.x.x"
            port 9200
            path ""
            scheme https
            ssl_verify false
            ssl_version TLSv1_2
            user "kibana10"
            password poorab
            reload_connections false
            reconnect_on_error true
            reload_on_failure true
            log_es_400_reason false
            logstash_prefix "logstash"
            logstash_dateformat "%Y.%m.%d"
            logstash_format true
            index_name "logstash"
            target_index_key
            type_name "fluentd"
            include_timestamp false
            template_name
            template_file
            template_overwrite false
            sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
            request_timeout 5s
            application_name default
            <buffer>
              flush_thread_count 8
              flush_interval 5s
              chunk_limit_size 2M
              queue_limit_length 32
              retry_max_interval 30
              retry_forever true
            </buffer>
          </match>

部署文件


kind: DaemonSet
apiVersion: apps/v1
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      volumes:
        - name: fluentd-config-map
          configMap:
            name: fluentd-config-map
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers
      containers:
        - name: fluentd
          image: 'fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-2'
          env:
            - name: FLUENT_ELASTICSEARCH_HOST
              value: x.x.x.x
            - name: FLUENT_ELASTICSEARCH_PORT
              value: '9200'
            - name: FLUENT_ELASTICSEARCH_SCHEME
              value: https
            - name: FLUENT_ELASTICSEARCH_SSL_VERIFY
              value: 'false'
            - name: FLUENT_ELASTICSEARCH_SSL_VERSION
              value: TLSv1_2
            - name: FLUENT_ELASTICSEARCH_USER
              value: kibana
            - name: FLUENT_ELASTICSEARCH_PASSWORD
              value: password
            - name: FLUENT_UID
              value: "0"
            - name: FLUENTD_SYSTEMD_CONF
              value: disable
            - name: FLUENT_CONTAINER_TAIL_EXCLUDE_PATH
              value: /var/log/containers/fluent*
            - name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
              value: /^(?<time>.+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
          resources:
            limits:
              cpu: 100m
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: fluentd-config-map
              mountPath: /fluentd/etc/fluent.conf
              subPath: fluent.conf
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              readOnly: true
              mountPath: /var/lib/docker/containers

我试图在弹性搜索中作为单个记录拉取的日志


2024-01-29 16:16:41.255 ERROR 1 --- [-StreamThread-1] c.o.b.d.p.app.transforms.ProcessorImpl   : Exception while parsing string {
  "sourceServer": "EC2AMAZ-TET8NM1",
  "timestamp": "2024-01-29T16:16:41.113Z",
  "messageType": "trackerLogRecord",
  "trackerLogRecord": {
    "genericAttributes": {
      "synergyDeviceId": "aaaa",
      "timestamp": "2024-01-29T16:15:37.390Z",
      "rawTimestamp": "303d81f2a7",
      "physicalOffset": gggg,
      "logicalOffset": 18,
      "totalRecordLen": 1,
      "type": 6,
      "extendedType": 2,
      "payloadLen": 3,
      "rPayload": "100"
    },
    "recordType": "Temperature",
    "BoardTemperature": {
      "boardTemperature": 2262.0
    }
  }
}, Cannot deserialize value of type `LogRecordType` from String "BoardTemperature": not one of the values accepted for Enum class: [trackerVersion ]
 at [Source: (String)"{
  "sourceServer": "xxxxxxx",
  "timestamp": "2024-01-29T16:16:41.113Z",
  "messageType": "LogRecord",
  "trackerLogRecord": {
    "genericAttributes": {
      "synergyDeviceId": "aaaa",
      "timestamp": "2024-01-29T16:15:37.390Z",
      "rawTimestamp": "303d81f2a7",
      "physicalOffset": gggg,
      "logicalOffset": 18,
      "totalRecordLen": 1,
      "type": 6,
      "extendedType": 2,
      "payloadLen": 3,
      "rPayload": "100"[truncated 148 chars]; line: 19, column: 19] (through reference chain: record["LogRecord"]->LogRecord["recordType"])
 [Ljava.lang.StackTraceElement;@315370cc
2024-01-29 16:16:41.255  INFO 1 --- [-StreamThread-1] c.o.b.d.p.app.kafka.producer.Producer    : Inserting record to: error

我检查了正则表达式...没问题。在从 kubernetes pod 中提取记录时,Fluentd 添加了一些字段 (2024-01-30T08:54:55.519393497Z stdout F 2024-01-30 08:54:55,519 INFO 1 --- [scheduling-1] c.o.b.l.d.ExceptionLogger :) 我已经考虑过了。

*** 在从 k8s pod 中提取日志时,Fluentd 添加了三个额外字段 time 、 Stream 和 logtype ex - 2024-01-30T08:54:55.519393497Z stdout F ... *** 这在正则表达式及其中被考虑工作。

Below is explaination of first line of multiline first line regex 

format_firstline /^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z ?(stdout|stderr) \w \d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}/


here first date regex represents the date added by fluent - This is there with every record added by fluentd
second is stream (stdout or stderr) - This is there with every record  added by fluentd
third denotes log type single character - - This is there with every record  added by fluentd
fourth date is the actual recieved in the logs and this indicates the start of the exception as rest of the lines in exception does not contains this date . 


Currently i am able to capture only first line of exception or if i change this i recieve every line of exception as an individual record.I need to capture complete exception as single record in elastic search.

*** Thanks and advance ***
elasticsearch kubernetes logging fluentd
1个回答
0
投票

如果您的所有日志都以相同的数据格式开头,我建议不要在源指令中解析您的日志。相反,使用 concat 过滤器来连接日志,然后解析它们。

<source>
  @type tail
  @id in_tail_container_logs
  path "/var/log/containers/*.log"
  pos_file "/var/log/fluentd-containers.log.pos"
  tag "kubernetes.*"
  exclude_path /var/log/containers/fluent*
  read_from_head true
    <parse>
      @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
      time_key time
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      time_type string
    </parse>
</source>
<filter kubernetes.**>
  @type concat
  key log
  multiline_start_regexp /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\](.*)\n$/
  continuous_line_regexp /^(?!\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\])(.*)\n$/
  separator ""
</filter>

然后,您可以稍后根据需要解析它。

© www.soinside.com 2019 - 2024. All rights reserved.