我使用Logstash上的jdbc接口将Postgres的部分数据上传到ElasticSearch。是否可以将 Logstash 配置为使用 WHERE 而不是 OFFSET?
我的配置:
input {
file {
path => "/var/log/logstash/logstash-plain.log"
type => "logstash-logs"
start_position => "beginning"
}
jdbc {
jdbc_driver_library => "/usr/share/logstash/external_jars/postgresql-42.5.4.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://172.17.0.1:5432/tyver_stage"
jdbc_user => "tyver_stage"
jdbc_password => "password"
schedule => "*/5 * * * *"
statement => "
SELECT
c.*,
COALESCE(c.updated_at, c.created_at) AS order_column,
CASE
WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
ELSE ARRAY_AGG(ucv.user_id)
END AS viewed_by
FROM
creatives as c
LEFT JOIN
user_creative_views ucv ON ucv.creative_id = c.id
WHERE
(c.updated_at >= :sql_last_value OR (c.updated_at IS NULL AND c.created_at >= :sql_last_value))
GROUP BY
c.id
ORDER BY
COALESCE(c.updated_at, c.created_at) ASC
"
use_column_value => true
tracking_column => "order_column"
tracking_column_type => "timestamp"
jdbc_paging_enabled => true
jdbc_page_size => 10000
record_last_run => true
clean_run => false
}
}
filter {
}
output {
elasticsearch {
hosts => ["172.17.0.1:9200"]
index => "tyver_index_creatives"
document_id => "%{id}"
}
}
结果我有以下 SQL 查询:
SELECT * FROM (
SELECT
c.*,
COALESCE(c.updated_at, c.created_at) AS order_column,
CASE
WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
ELSE ARRAY_AGG(ucv.user_id)
END AS viewed_by
FROM
creatives as c
LEFT JOIN
user_creative_views ucv ON ucv.creative_id = c.id
WHERE
(c.updated_at >= '1970-01-01 00:00:00.000000+0000' OR (c.updated_at IS NULL AND c.created_at >= '1970-01-01 00:00:00.000000+0000'))
GROUP BY
c.id
ORDER BY
COALESCE(c.updated_at, c.created_at) ASC
) AS "t1" LIMIT 10000 OFFSET 20000
资源非常昂贵,因为查询
LIMIT 10000 OFFSET 20000
等于 LIMIT 30000
。这里我有 35M+ 行和 100Gb+ 数据,使用这种方式上传它太重了。是否可以将 Logstash 配置为使用 WHERE 而不是像 WHERE COALESCE(c.updated_at, c.created_at) > :the_last_order_value
那样使用 OFFSET?
尝试这个 SQl 查询
SELECT
c.*,
COALESCE(c.updated_at, c.created_at) AS order_column,
CASE
WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
ELSE ARRAY_AGG(ucv.user_id)
END AS viewed_by
FROM
creatives AS c
LEFT JOIN
user_creative_views ucv ON ucv.creative_id = c.id
WHERE
COALESCE(c.updated_at, c.created_at) > :sql_last_value
GROUP BY
c.id