MSK 连接器无法使用 KRaft 模式在 MSK 集群中找到类

问题描述 投票:0回答:1

我有一个使用 ZooKeeper 模式的 Kafka 3.7.x 集群,运行良好。现在我正在 KRaft 模式下尝试新的 Kafak:

enter image description here

但是,对于这个 KRaft 模式的 Kafka 集群,相同的连接器无法启动。

enter image description here

这里是完整的 Kakfa 连接器错误日志

里面有注释

org.apache.kafka.connect.errors.ConnectException: Failed to find any
class that implements Connector and which name matches
com.snowflake.kafka.connector.SnowflakeSinkConnector, available
connectors are: PluginDesc{klass=class
org.apache.kafka.connect.file.FileStreamSinkConnector,
name='org.apache.kafka.connect.file.FileStreamSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.file.FileStreamSourceConnector,
name='org.apache.kafka.connect.file.FileStreamSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorCheckpointConnector,
name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector,
name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorSourceConnector,
name='org.apache.kafka.connect.mirror.MirrorSourceConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockConnector,
name='org.apache.kafka.connect.tools.MockConnector', version='2.7.1',
encodedVersion=2.7.1, type=connector, typeName='connector',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockSinkConnector,
name='org.apache.kafka.connect.tools.MockSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockSourceConnector,
name='org.apache.kafka.connect.tools.MockSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.SchemaSourceConnector,
name='org.apache.kafka.connect.tools.SchemaSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.VerifiableSinkConnector,
name='org.apache.kafka.connect.tools.VerifiableSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.VerifiableSourceConnector,
name='org.apache.kafka.connect.tools.VerifiableSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'} ```

两个 Kafka 集群都使用相同的插件,该插件是包含这些 jar 的 zip 文件。你可以看到snowflake-kafka-connector-2.2.2.jar里面有

com.snowflake.kafka.connector.SnowflakeSinkConnector

enter image description here

对于这两个 Kafka 集群,连接器和插件使用相同的 cTerraform 代码,这意味着所有参数都相同。 唯一的区别是 Kafka 集群中的

kafka_version
3.7.x
3.7.x.kraft

resource "aws_msk_cluster" "hm_amazon_msk_cluster" {
  cluster_name           = var.amazon_msk_cluster_name
  kafka_version          = "3.7.x.kraft" # <- only this line is different, the other one is `3.7.x`
  number_of_broker_nodes = var.kafka_broker_number
  storage_mode           = "TIERED"
  broker_node_group_info {
    instance_type   = "kafka.m7g.large"
    security_groups = [var.amazon_vpc_security_group_id]
    client_subnets  = var.amazon_vpc_subnet_ids
  }
  logging_info {
    broker_logs {
      s3 {
        enabled = true
        bucket  = var.kafka_broker_log_s3_bucket_name
        prefix  = "brokers"
      }
    }
  }
  encryption_info {
    encryption_at_rest_kms_key_arn = var.aws_kms_key_arn
  }
  client_authentication {
    sasl {
      iam = true
    }
  }
}
resource "aws_s3_object" "hm_amazon_s3_object" {
  bucket = var.s3_bucket_name
  key    = var.s3_key
  source = var.local_file_path
  etag   = filemd5(var.local_file_path)
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
  name         = var.amazon_msk_plugin_name
  content_type = "ZIP"
  location {
    s3 {
      bucket_arn = var.s3_bucket_arn
      file_key   = var.amazon_msk_plugin_s3_key
    }
  }
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
  name                 = var.amazon_msk_connector_name
  kafkaconnect_version = "2.7.1"
  capacity {
    autoscaling {
      mcu_count        = 1
      min_worker_count = 1
      max_worker_count = 2
      scale_in_policy {
        cpu_utilization_percentage = 40
      }
      scale_out_policy {
        cpu_utilization_percentage = 95
      }
    }
  }
  # https://docs.snowflake.com/en/user-guide/kafka-connector-install#label-kafka-properties
  # https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-kafka
  connector_configuration = {
    "connector.class"                  = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
    "tasks.max"                        = 4
    "topics"                           = var.kafka_topic_name
    "buffer.count.records"             = 10000
    "buffer.flush.time"                = 5
    "buffer.size.bytes"                = 20000000
    "snowflake.url.name"               = "xx.snowflakecomputing.com"
    "snowflake.user.name"              = var.snowflake_user_name
    "snowflake.private.key"            = var.snowflake_private_key
    "snowflake.private.key.passphrase" = var.snowflake_private_key_passphrase
    "snowflake.role.name"              = var.snowflake_role_name
    "snowflake.ingestion.method"       = "SNOWPIPE_STREAMING"
    "snowflake.enable.schematization"  = true
    "snowflake.database.name"          = var.snowflake_database_name
    "snowflake.schema.name"            = var.snowflake_schema_name
    "value.converter"                     = "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url" = var.confluent_schema_registry_url
    "errors.log.enable"                   = true
    "errors.tolerance"                    = "all"
    "jmx"                                 = true
  }
  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = var.amazon_msk_cluster_bootstrap_servers
      vpc {
        security_groups = [var.amazon_vpc_security_group_id]
        subnets         = var.amazon_vpc_subnet_ids
      }
    }
  }
  kafka_cluster_client_authentication {
    authentication_type = "IAM"
  }
  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }
  plugin {
    custom_plugin {
      arn      = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.arn
      revision = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.latest_revision
    }
  }
  log_delivery {
    worker_log_delivery {
      s3 {
        bucket  = var.msk_log_s3_bucket_name
        prefix  = var.msk_log_s3_key
        enabled = true
      }
    }
  }
  service_execution_role_arn = var.amazon_msk_connector_iam_role_arn
}

使用Kafka KRaft模式还有什么需要注意的吗?这可能是 MSK 的错误吗?谢谢!

apache-kafka terraform apache-kafka-connect aws-msk kraft
1个回答
0
投票

嗯,这有点尴尬。在每个资源之间添加

depends_on
后,现在效果很好。其他都没有改变。

我想也许当连接器启动时,S3中的ZIP还没有完全上传。

resource "aws_s3_object" "hm_amazon_s3_object" {
  # ...
}

resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
  # ...
  depends_on = [
    aws_s3_object.hm_amazon_s3_object
  ]
}

resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
  # ...
  depends_on = [
    aws_mskconnect_custom_plugin.hm_amazon_msk_plugin
  ]
}
© www.soinside.com 2019 - 2024. All rights reserved.