我目前正在研究Kafka Connect将我们的一些数据库传输到数据湖。为了测试Kafka Connect我已经在我们的项目数据库之一设置了一个数据库。到目前为止一切都很好。
下一步我使用以下属性模式配置Kafka Connect:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated_at,created_at",
"incrementing.column.name": "id",
"dialect.name": "SqlServerDatabaseDialect",
"validate.non.null": "false",
"tasks.max": "1",
"mode": "timestamp+incrementing",
"topic.prefix": "mssql-jdbc-",
"poll.interval.ms": "10000",
}
虽然这适用于我的大多数表格,其中我有一个ID和一个created_at / updated_at字段,但它不适用于我的表格,在那里我用中间的表和复合键解决了我的多对多关系。请注意,我将通用JDBC配置与Microsoft的JDBC驱动程序一起使用。
有没有办法为这些特殊情况配置Kafka Connect?
您可能需要创建多个表,而不是一个连接器来拉动所有表。如果要使用不同的方法来获取数据或不同的ID /时间戳列,则会出现这种情况。正如@ cricket_007所说,你可以使用query
选项来提取查询结果 - 这可能是表示你的多表连接的SELECT
。即使从单个表对象中提取数据,JDBC连接器本身也只是从给定表中发出SELECT *
,并使用WHERE
谓词来限制基于递增ID /时间戳选择的行。
另一种方法是使用基于日志的更改数据捕获(CDC),并将所有更改直接从数据库流式传输到Kafka。
无论您使用JDBC还是基于日志的CDC,都可以使用流处理来解析Kafka本身的连接。一个例子是Kafka Streams或KSQL。我写过关于后者的lot here。
您可能还会发现this article有用,详细描述了将数据库与Kafka集成的选项。
免责声明:我在开源KSQL项目背后的公司Confluent工作。