我怎么能动态地从MySQL与logstash(不重复)更新我的数据?

问题描述 投票:2回答:1

我已经配置logstash.conf插入我的数据库的动态数据,但问题是:

当我改变行我表的该行没有更新我的指数,因为我只sql_last_value后插入新的价值观,我虽然对我们的触发器,但我不知道我怎么能做到这一点。

input {
  jdbc { 
    jdbc_connection_string =>"jdbc:mysql://localhost:3306/blog"
    jdbc_user =>"root"
    jdbc_password =>""
    jdbc_driver_library =>"C:\Users\saidb\Downloads\mysql-connector-java-5.1.47\mysql-connector-java-5.1.47.jar"
    jdbc_driver_class =>"com.mysql.jdbc.Driver"
    schedule =>"* * * * *"
    statement =>"SELECT * FROM blog_pro WHERE id >:sql_last_value"
    use_column_value =>true
    tracking_column =>id
    }
  }
output {
  elasticsearch {
    hosts =>"localhost:9200"
    index =>"blog_pro"
    document_type =>"data"
  }
}
mysql elasticsearch jdbc logstash
1个回答
2
投票

如果您使用id选择行你不能做到这一点。你有2种选择,

  1. 选择所有行,每次使用查询SELECT * FROM blog_pro,我不认为是根据您的情况做一个好送他们到ES。
  2. 创建新列last_modified_time包含的记录(行)的最后一次修改的时间戳。然后用它来过滤行。注意,该酒店tracking_column_type => "timestamp"

statement =>"SELECT * FROM blog_pro WHERE last_modiefied_time >:sql_last_value" use_column_value =>true tracking_column =>last_modified_time tracking_column_type => "timestamp"

这里到处是logstash配置

input { 

 jdbc { 
    jdbc_connection_string =>"jdbc:mysql://192.168.3.57:3306/blog_pro"
    jdbc_user =>"dush"
    jdbc_password =>"dush"
    jdbc_driver_library =>"F:\logstash-6.2.2\bin\mysql-connector-java-5.1.6.jar"
    jdbc_driver_class =>"com.mysql.jdbc.Driver"
    schedule =>"* * * * *"
    statement =>"SELECT * FROM blog_pro WHERE last_modified_time  >:sql_last_value"
    use_column_value =>true
    tracking_column =>last_modified_time
    tracking_column_type => "timestamp"
    } 
 }

output 
{ 
    #output to elasticsearch    
    elasticsearch {
        hosts => [ "192.168.1.245:9201" ]
        action=>update
        # "%{id}" - > primary key of the table 
        document_id => "%{id}"
        doc_as_upsert =>true
    }

}

注意,您可能需要清除索引以及与此配置开始索引。我测试了这一点,并能正常工作。

Elasticsearch版本= 5.x.x

logstash版本6.2.2 =

© www.soinside.com 2019 - 2024. All rights reserved.