在Logstash中过滤jdbc数据

问题描述 投票:0回答:1

在我的数据库中,我的数据格式如下:

enter image description here

但是在ElasticSearch中,我想针对项目类型推送数据。因此,ElasticSearch中的每条记录将列出所有项目名称及其每个项目类型的值。

喜欢这个:

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "10",
  "_source": {
    "item_type: "10",
    "fruits": "20",
    "veggies": "32",
    "butter": "11",
  }
}

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "11",
  "_source": {
    "item_type: "11",
    "hair gel": "50",
    "shampoo": "35",
  }
}

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "12",
  "_source": {
    "item_type: "12",
    "tape": "9",
    "10mm screw": "7",
    "blinker fluid": "78",
  }
}

我可以在Logstash中实现吗?

我是Logstash的新手,但据我了解,可以在filter中完成。但是我不确定要使用哪个过滤器,还是必须为此创建自定义过滤器。

当前conf示例:

input {
  jdbc {
    jdbc_driver_library => "ojdbc6.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "myjdbc-configs"
    jdbc_user => "dbuser"
    jdbc_password => "dbpasswd"
    schedule => "* * * * *"
    statement => "SELECT * from item_table"
  }
}
filter {
    ## WHAT TO WRITE HERE??
}
output {
    elasticsearch {
        hosts => [ "http://myeshost/" ]
        index => "myindex"
    }
}

请提示。 谢谢。

elasticsearch jdbc filter logstash
1个回答
0
投票

您可以使用aggregate filter plugin实现。我没有在下面进行测试,但是应该给您一个想法。

 filter {     
      aggregate {
        task_id => "%{item_type}" #
        code => "
          map['Item_type'] = event.get('Item_type')
          map[event.get('Item_Name')] = map[event.get('Item_Value')]
        "
        push_previous_map_as_event => true
        timeout => 3600
        timeout_tags => ['_aggregatetimeout']
      }
      if "aggregated" not in [tags] {
        drop {}
      }
    }

使用聚合过滤器的重要警告

  • SQL查询必须按Item_Type对结果进行排序,因此事件不会出现混乱。
  • SQL查询中的列名应与过滤器map[]中的列名匹配。>
  • 您应仅使用一个工作线程进行聚合,否则事件可能会被按顺序处理,并且会发生意外结果。
© www.soinside.com 2019 - 2024. All rights reserved.