如何在 Kafka 连接中将微秒时间戳转换为毫秒?

问题描述 投票:0回答:1

使用 confluenceinc/kafka-connect-bigquery 从 Kafka 填充 BigQuery 时,我想使用域 (Avro) 事件的时间戳来(日)对表进行分区。

我的

connector.properties
看起来如下:

[...]
transforms=ConvertTimestamp
transforms.ConvertTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertTimestamp.field=metadata_date
transforms.ConvertTimestamp.target.type=Timestamp

timestampPartitionFieldName=metadata_date
[...]
Exception in thread "pool-5-thread-522" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
    [row index 0]: invalid: Timestamp field value is out of range:1597279521000000000
    [row index 1]: invalid: Timestamp field value is out of range:1597279523000000000
[...]

问题似乎是,我们的时间戳以微秒为单位(UTC Unix 纪元)

"type": {
    "type": "long",
    "logicalType": "timestamp-micros"
}

而 BigQuery 需要毫秒(或秒?)。

有没有办法直接使用连接器转换时间戳?

apache-kafka google-bigquery avro apache-kafka-connect confluent-schema-registry
1个回答
0
投票

这个问题现在似乎已经得到解决。您可以使用附加转换参数

unix.precision
来确保正确解析类型为
timestamp-micros
的 Avro 字段:

transforms=ConvertTimestamp
transforms.ConvertTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertTimestamp.field=metadata_date
transforms.ConvertTimestamp.target.type=Timestamp
transforms.ConvertTimestamp.unix.precision=microseconds

似乎 Confluence 文档 尚未更新以反映这一点,但相应的提案文档 对此事提供了更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.