我有一个 S3 接收器连接器,它从 Kafka 主题读取数据并溢出到 S3。连接器不消耗该主题。
这是连接器配置:
{
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "<aws region>",
"topics.dir": "my-topic",
"flush.size": "1000",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"rotate.interval.ms": "30000",
"locale": "en-US",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"aws.access.key.id": "<aws access key>",
"errors.deadletterqueue.topic.replication.factor": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"s3.bucket.name": "<aws bucket>",
"partition.duration.ms": "30000",
"schema.compatibility": "NONE",
"topics": "my-topic",
"aws.secret.access.key": "<aws secret key>",
"task.class": "io.confluent.connect.s3.S3SinkTask",
"errors.deadletterqueue.topic.name": "dlq-my-topic",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "my-connector",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"rotate.schedule.interval.ms": "60000",
"timestamp.extractor": "Record"
}
}
这是连接器状态:
curl localhost:8083/connectors/my-connector/status
{
"name": "my-connector",
"connector": {
"state": "RUNNING",
"worker_id": "localhost:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "localhost:8083"
}
],
"type": "sink"
}
提供的Kafka消费者组信息如下: (
./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group connect-my-connector --describe
)
团体 | 主题 | 分区 | 电流偏移 | 对数端偏移 | 滞后 | 消费者 ID | 主机 | 客户端 ID |
---|---|---|---|---|---|---|---|---|
连接我的连接器 | 我的主题 | 0 | 1182 | 12072 | 10890 | 连接器-消费者-我的连接器-0-68793e0d-8312-4d20-b23c-5221ca54b0dc | ip | 连接器-消费者-我的-连接器-0 |
所以看来kafka-connect在消费者组中有一个活跃的消费者。 连接器不从 Kafka 消费的原因可能是什么?
看看this - 答案没有意义,因为我们的 Kafka 集群中未启用身份验证。还有什么可能导致连接的 kafka-connect 连接器不消费来自主题的消息?连接启动、插件加载或连接器初始化期间日志中没有明显错误。
是的,上面的配置对我来说也没有消耗。当如下所示更改值转换器配置时,它开始消耗并写入 S3。
您可能还想设置连接服务器日志记录以跟踪 s3 接收器连接器包以获取更多信息。
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "$REGION",
"topics.dir": "pageviews",
"flush.size": "10",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"rotate.interval.ms": "600000",
"locale": "en-US",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"aws.access.key.id": "${AWS_ACCESS_KEY_ID}",
"errors.deadletterqueue.topic.replication.factor": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"s3.bucket.name": "s3-sink-test ",
"partition.duration.ms": "30000",
"schema.compatibility": "NONE",
"topics": "pageviews",
"aws.secret.access.key": "${AWS_SECRET_ACCESS_KEY}",
"task.class": "io.confluent.connect.s3.S3SinkTask",
"errors.deadletterqueue.topic.name": "dlq-my-topic",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm",
"rotate.schedule.interval.ms": "60000",
"timestamp.extractor": "Record"
}
}