在我的数据库中,我的数据格式如下:
但是在ElasticSearch
中,我想针对项目类型推送数据。因此,ElasticSearch中的每条记录将列出所有项目名称及其每个项目类型的值。
喜欢这个:
{
"_index": "daily_needs",
"_type": "id",
"_id": "10",
"_source": {
"item_type: "10",
"fruits": "20",
"veggies": "32",
"butter": "11",
}
}
{
"_index": "daily_needs",
"_type": "id",
"_id": "11",
"_source": {
"item_type: "11",
"hair gel": "50",
"shampoo": "35",
}
}
{
"_index": "daily_needs",
"_type": "id",
"_id": "12",
"_source": {
"item_type: "12",
"tape": "9",
"10mm screw": "7",
"blinker fluid": "78",
}
}
我可以在Logstash
中实现吗?
我是Logstash的新手,但据我了解,可以在filter
中完成。但是我不确定要使用哪个过滤器,还是必须为此创建自定义过滤器。
当前conf示例:
input {
jdbc {
jdbc_driver_library => "ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "myjdbc-configs"
jdbc_user => "dbuser"
jdbc_password => "dbpasswd"
schedule => "* * * * *"
statement => "SELECT * from item_table"
}
}
filter {
## WHAT TO WRITE HERE??
}
output {
elasticsearch {
hosts => [ "http://myeshost/" ]
index => "myindex"
}
}
请提示。 谢谢。
您可以使用aggregate filter plugin实现。我没有在下面进行测试,但是应该给您一个想法。
filter {
aggregate {
task_id => "%{item_type}" #
code => "
map['Item_type'] = event.get('Item_type')
map[event.get('Item_Name')] = map[event.get('Item_Value')]
"
push_previous_map_as_event => true
timeout => 3600
timeout_tags => ['_aggregatetimeout']
}
if "aggregated" not in [tags] {
drop {}
}
}
使用聚合过滤器的重要警告:
map[]
中的列名匹配。>