在我的一个用例中,我正在尝试创建一个管道
每当我从自定义分区发送消息时,我都会使用LONG数据类型以毫秒为单位发送时间戳,因为在架构中,timestamp列已定义为long。
我之前在自定义分区中的代码:
Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
在我发送记录之前显示结果:
date = Tue Mar 26 22:02:04 EDT 2019,time in millis = 1553652124063
在table2中的timestamp列中插入的值:
3/27/2019 2:02:04.063000 AM
由于它采用英国时区(我相信),我暂时修复了从当前时间戳减去4小时的时间,以便我可以匹配美国EST时间戳。
Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
显示结果:
date = Tue Mar 26 22:04:43 EDT 2019,time in millis = 1553637883826
在table2中的timestamp列中插入的值:
3/26/2019 10:04:43.826000 PM
如果我错过任何东西,请告诉我,因为当我从自定义分区发送消息时,我不确定为什么会发生这种情况。
在引擎盖下Jdbc Source Connector使用以下查询:
SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
(someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC
总结:如果查询的时间戳列的值早于当前时间戳并且晚于上次检查,则查询将检索行。
以上参数是:
beginTimetampValue
- 上次导入记录的timestamp列的值endTimetampValue
- 根据数据库的当前时间戳lastIncrementedValue
- 上次导入记录的增量列的值我认为在你的情况下,Producer
将带有更高时间戳的Tables记录放入比以后手动插入(使用查询)。
当Jdbc Connector检查要导入Kafka的新记录时,它会跳过它们(因为它们没有满足someTimestampColumn < $endTimetampValue
时间戳条件)
您还可以将日志级别更改为DEBUG
,并查看日志中发生的情况