Postgres Logstash 上的自定义offseh 条件

问题描述 投票:0回答:1

我使用Logstash上的jdbc接口将Postgres的部分数据上传到ElasticSearch。是否可以将 Logstash 配置为使用 WHERE 而不是 OFFSET?

我的配置:

input {
  file {
    path => "/var/log/logstash/logstash-plain.log"
    type => "logstash-logs"
    start_position => "beginning"
  }

  jdbc {
    jdbc_driver_library => "/usr/share/logstash/external_jars/postgresql-42.5.4.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://172.17.0.1:5432/tyver_stage"
    jdbc_user => "tyver_stage"
    jdbc_password => "password"
    schedule => "*/5 * * * *"
    statement => "
      SELECT
        c.*,
        COALESCE(c.updated_at, c.created_at) AS order_column,
        CASE
          WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
          ELSE ARRAY_AGG(ucv.user_id)
        END AS viewed_by
      FROM
        creatives as c
          LEFT JOIN
        user_creative_views ucv ON ucv.creative_id = c.id
      WHERE
        (c.updated_at >= :sql_last_value OR (c.updated_at IS NULL AND c.created_at >= :sql_last_value))
      GROUP BY
        c.id
      ORDER BY
        COALESCE(c.updated_at, c.created_at) ASC
    "
    use_column_value => true
    tracking_column => "order_column"
    tracking_column_type => "timestamp"
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    record_last_run => true
    clean_run => false
  }
}

filter {

}

output {
  elasticsearch {
    hosts => ["172.17.0.1:9200"]
    index => "tyver_index_creatives"
    document_id => "%{id}"
  }
}

结果我有以下 SQL 查询:

 SELECT * FROM (
      SELECT
        c.*,
        COALESCE(c.updated_at, c.created_at) AS order_column,
        CASE
          WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
          ELSE ARRAY_AGG(ucv.user_id)
        END AS viewed_by
      FROM
        creatives as c
          LEFT JOIN
        user_creative_views ucv ON ucv.creative_id = c.id
      WHERE
        (c.updated_at >= '1970-01-01 00:00:00.000000+0000' OR (c.updated_at IS NULL AND c.created_at >= '1970-01-01 00:00:00.000000+0000'))
      GROUP BY
        c.id
      ORDER BY
        COALESCE(c.updated_at, c.created_at) ASC
    ) AS "t1" LIMIT 10000 OFFSET 20000

资源非常昂贵,因为查询

LIMIT 10000 OFFSET 20000
等于
LIMIT 30000
。这里我有 35M+ 行和 100Gb+ 数据,使用这种方式上传它太重了。是否可以将 Logstash 配置为使用 WHERE 而不是像
WHERE COALESCE(c.updated_at, c.created_at) > :the_last_order_value
那样使用 OFFSET?

postgresql elasticsearch logstash devops etl
1个回答
0
投票

尝试这个 SQl 查询

SELECT
    c.*,
    COALESCE(c.updated_at, c.created_at) AS order_column,
    CASE
        WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
        ELSE ARRAY_AGG(ucv.user_id)
    END AS viewed_by
FROM
    creatives AS c
LEFT JOIN
    user_creative_views ucv ON ucv.creative_id = c.id
WHERE
    COALESCE(c.updated_at, c.created_at) > :sql_last_value
GROUP BY
    c.id
© www.soinside.com 2019 - 2024. All rights reserved.