如何使用logstash连接elasticsearch和postgres?

问题描述 投票:0回答:2

我无法使用logstash将数据从Postgres传输到Elastic。我收到错误“在converge_state中阻止”。生成的错误是:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of \r, \n at line 36, column 4 (byte 583) after # }", :backtrace=>["/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:24:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}

我的配置文件如下:

input {
    jdbc {

        jdbc_connection_string => "jdbc:postgresql://localhost:5432/school"


        jdbc_user => "postgres"
        jdbc_password => "postgres"


        jdbc_driver_library => "/Users/karangupta/Downloads/postgresql-42.2.8.jar"


        jdbc_driver_class => "org.postgresql.Driver"

        statement => "select sch.udise_sch_code,
    sch.school_name,
    e.edu_block_name,
    d.district_name,
    s.state_name,

    sch.school_address,
    sch.pincode,
    case when sch.sch_loc_r_u = 1 then 'rural'
        when sch.sch_loc_r_u = 2 then 'urban' end as "sch_location_type",
    sch.assembly_const,
    sch.status,

    schd.academic_year,
    schd.sch_category,
    schd.class_frm,
    schd.class_to,

    case when schd.sch_type = 1 then 'boys' 
          when schd.sch_type = 2 then 'girls' 
          else 'coed' end as "school_type",

    case when sde.is_cce_ele = 1 then 'yes'
        when sde.is_cce_ele = 2 then 'no' 
        else 'NA' end as "cce_implemented_ele",
    case when sde.is_pcr_maintained = 1 then 'yes'
        when sde.is_pcr_maintained = 2 then 'no' 
        else 'NA' end as "cce_implemented_ele",
    case when sde.is_pcr_shared_parents = 1 then 'yes'
        when sde.is_pcr_shared_parents = 2 then 'no' 
        else 'NA' end as "is_pcr_shared_parents",
    case when sde.is_cce_sec = 1 then 'yes'
        when sde.is_cce_sec = 2 then 'no' 
        else 'NA' end as "cce_implemented_sec",
    case when sde.is_cce_hrsec = 1 then 'yes'
        when sde.is_cce_hrsec = 2 then 'no' 
        else 'NA' end as "cce_implemented_hrsec",
    sde.rte_25p_applied,
    sde.rte_25p_enrolled_previous_yr,
    sde.is_smc,
    sde.smc_tot_m,
    sde.smc_tot_f,
    sde.sms_parents_m,
    sde.sms_parents_f,
    sde.smc_lgb_m
from mst_school_2017_18 sch 
inner join mst_edu_block_2017_18 e on sch.udise_edu_block_code = e.udise_edu_block_code
inner join mst_district_2017_18 d on e.udise_dist_code = d.udise_district_code
inner join mst_state_2017_18 s on d.udise_state_code = s.udise_state_code
inner join school_details_2017_18 schd on sch.udise_sch_code = schd.udise_sch_code
inner join school_details_extended_2017_18 sde on sch.udise_sch_code = sde.udise_sch_code"
    }
}


output {
    stdout { codec => json_lines }
}

# output {
#   elasticsearch {
#       index => "schoolDetails"
#       hosts => "http://localhost:9200"
#   }
# }

我不知道我在做什么错。

elasticsearch jdbc logstash logstash-jdbc
2个回答
0
投票

我建议您将多行SQL语句存储在文件中,然后存储在reference that file in your configuration中。


0
投票

JDBC输入插件期望SQL语句在一行中没有换行符。因此,通过用空格替换所有换行符,就可以了。

© www.soinside.com 2019 - 2024. All rights reserved.