我将数据存储在 postgreSQL 作为数据源,我想加载 clickhouse 数据仓库的维度和事实表,我是 clickhouse 的新手,习惯使用 Talend 和 Microsoft SSIS 等传统集成工具来执行 ETL (PS 我正在为 clickhouse 和 postgreSQL 使用 docker 镜像)
您可以使用以下方法将数据从 Postgres 提取到 Clickhouse 中:
外部 ETL 工具,例如 Airbyte,并创建从 Postgres 到 Clikhouse 的连接器
Clickhouse 集成表引擎 创建从 Clickhouse 到 Postgres 数据的视图,然后使用
insert into
查询将该视图中的数据插入到真正的 Clickhouse 表中
这里是如何将 PostgreSQL 数据导入到相同形状的新 ClickHouse 表中的示例。假设您有一个名为
foo
的表,其中包含两列:id
和 foo
。
CREATE TABLE IF NOT EXISTS foo_pg
(
id UUID,
foo String
) ENGINE = PostgreSQL('host:port', 'database', 'tablename', 'username', 'password');
INSERT
上进行类型转换):CREATE TABLE IF NOT EXISTS foo
(
id UUID,
foo String
) ENGINE = MergeTree()
ORDER BY tuple()
PRIMARY KEY(id);
INSERT INTO foo (id, foo)
SELECT id, foo FROM foo_pg;
我们使用 Kafka 和 Debezium 通过 CDC(更改数据捕获)将数据从 PostgreSQL 流式传输到 ClickHouse。
su - postgres
psql -U debezium_user -d salary
CREATE TABLE IF NOT EXISTS employee
(
id bigint NOT NULL,
department character varying(255) NOT NULL,
employee_number character varying(255),
date_of_recruitment date,
CONSTRAINT employee_pkey PRIMARY KEY (id)
);
INSERT INTO employee VALUES (1, 'full time', '776E', now());
echo '{
"name": "shipments-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.server.name": "postgres",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "debezium_pw",
"database.dbname" : "salary",
"table.include.list": "public.employee",
"topic.prefix": "salary",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double"
}
}' > debezium.json
curl -H 'Content-Type: application/json' debezium:8083/connectors --data "@debezium.json"
您应该看到 Kafka 主题中生成了一个名为 salal.public.employee 的主题
/usr/bin/kafka-topics --list --bootstrap-server kafka:9092
/usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic salary.public.employee --from-beginning
CREATE TABLE IF NOT EXISTS default.kafka_table
(
`id` UInt64,
`department` String,
`employee_number` String,
`date_of_recruitment` Date
)
ENGINE = ReplacingMergeTree
ORDER BY id;
这样,您就设置了从 PostgreSQL 到 ClickHouse 的测试 CDC 管道。您可以在this PostgreSQL > ClickHouse Streaming Guide看到更详细的解释。