Kafka Connect - S3 Sink 不从主题消费

问题描述 投票:0回答:1

我有一个 S3 接收器连接器,它从 Kafka 主题读取数据并溢出到 S3。连接器不消耗该主题。

这是连接器配置:

{
  "name": "my-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "behavior.on.null.values": "ignore",
    "s3.region": "<aws region>",
    "topics.dir": "my-topic",
    "flush.size": "1000",
    "tasks.max": "1",
    "s3.part.size": "5242880",
    "timezone": "UTC",
    "rotate.interval.ms": "30000",
    "locale": "en-US",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "aws.access.key.id": "<aws access key>",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "s3.bucket.name": "<aws bucket>",
    "partition.duration.ms": "30000",
    "schema.compatibility": "NONE",
    "topics": "my-topic",
    "aws.secret.access.key": "<aws secret key>",
    "task.class": "io.confluent.connect.s3.S3SinkTask",
    "errors.deadletterqueue.topic.name": "dlq-my-topic",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "name": "my-connector",
    "errors.tolerance": "all",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "rotate.schedule.interval.ms": "60000",
    "timestamp.extractor": "Record"
  }
}

这是连接器状态:

curl localhost:8083/connectors/my-connector/status
{
  "name": "my-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "localhost:8083"
    }
  ],
  "type": "sink"
}

提供的Kafka消费者组信息如下: (

./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group connect-my-connector --describe
)

团体 主题 分区 电流偏移 对数端偏移 滞后 消费者 ID 主机 客户端 ID
连接我的连接器 我的主题 0 1182 12072 10890 连接器-消费者-我的连接器-0-68793e0d-8312-4d20-b23c-5221ca54b0dc ip 连接器-消费者-我的-连接器-0

所以看来kafka-connect在消费者组中有一个活跃的消费者。 连接器不从 Kafka 消费的原因可能是什么?

看看this - 答案没有意义,因为我们的 Kafka 集群中未启用身份验证。还有什么可能导致连接的 kafka-connect 连接器不消费来自主题的消息?连接启动、插件加载或连接器初始化期间日志中没有明显错误。

apache-kafka apache-kafka-connect s3-kafka-connector
1个回答
0
投票

是的,上面的配置对我来说也没有消耗。当如下所示更改值转换器配置时,它开始消耗并写入 S3。

您可能还想设置连接服务器日志记录以跟踪 s3 接收器连接器包以获取更多信息。

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "behavior.on.null.values": "ignore",
    "s3.region": "$REGION",
    "topics.dir": "pageviews",
    "flush.size": "10",
    "tasks.max": "1",
    "s3.part.size": "5242880",
    "timezone": "UTC",
    "rotate.interval.ms": "600000",
    "locale": "en-US",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "aws.access.key.id": "${AWS_ACCESS_KEY_ID}",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "s3.bucket.name": "s3-sink-test ",
    "partition.duration.ms": "30000",
    "schema.compatibility": "NONE",
    "topics": "pageviews",
    "aws.secret.access.key": "${AWS_SECRET_ACCESS_KEY}",
    "task.class": "io.confluent.connect.s3.S3SinkTask",
    "errors.deadletterqueue.topic.name": "dlq-my-topic",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "errors.tolerance": "all",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm",
    "rotate.schedule.interval.ms": "60000",
    "timestamp.extractor": "Record"
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.