我正在用 java 开发 Kafka Stream 示例,当我启动应用程序时出现以下错误。
错误:
17:34:38.930 [kafka-admin-client-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-admin] DEBUG org.apache.kafka.clients.NetworkClient -- [AdminClient clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-admin] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-admin, correlationId=1, headerVersion=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1, host='localhost', port=9092, rack=null)], clusterId='MkU3OEVBNTcwNTJENDM2Qg', controllerId=1, topics=[], clusterAuthorizedOperations=-2147483648)
17:34:38.924 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.clients.NetworkClient -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] Version mismatch when attempting to send GetTelemetrySubscriptionsRequestData(clientInstanceId=AAAAAAAAAAAAAAAAAAAAAA) with correlation id 3 to -1
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS
17:34:38.931 [main] INFO org.apache.kafka.streams.KafkaStreams -- stream-client [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf] Started 1 stream threads
17:34:38.931 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -- The broker generated an error for the get telemetry network API request
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS
17:34:38.931 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -- Updating intervalMs: 300000, lastRequestMs: 1713614678931
17:34:38.932 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -- Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to SUBSCRIPTION_NEEDED
17:34:38.931 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread -- stream-thread [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] Starting
17:34:38.934 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread -- stream-thread [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] State transition from CREATED to STARTING
17:34:38.936 [kafka-admin-client-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-admin] DEBUG org.apache.kafka.clients.admin.internals.AdminMetadataManager -- [AdminClient clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-admin] Updating cluster metadata to Cluster(id = MkU3OEVBNTcwNTJENDM2Qg, nodes = [localhost:9092 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 1 rack: null))
17:34:38.933 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.clients.NetworkClient -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer, correlationId=1, headerVersion=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1, host='localhost', port=9092, rack=null)], clusterId='MkU3OEVBNTcwNTJENDM2Qg', controllerId=1, topics=[], clusterAuthorizedOperations=-2147483648)
17:34:38.940 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Subscribed to topic(s): words
17:34:38.940 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] Cluster ID: MkU3OEVBNTcwNTJENDM2Qg
17:34:38.940 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.clients.Metadata -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='MkU3OEVBNTcwNTJENDM2Qg', nodes={1=localhost:9092 (id: 1 rack: null)}, partitions=[], controller=localhost:9092 (id: 1 rack: null)}
17:34:38.941 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.clients.NetworkClient -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] Received INIT_PRODUCER_ID response from node -1 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer, correlationId=2, headerVersion=2): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, producerId=26, producerEpoch=0)
17:34:38.941 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamThread -- stream-thread [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] Invoking poll on main Consumer
17:34:38.941 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] INFO org.apache.kafka.clients.producer.internals.TransactionManager -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] ProducerId set to 26 with epoch 0
17:34:38.941 [kafka-producer-network-thread | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager -- [Producer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-producer] Transition from state INITIALIZING to READY
17:34:38.942 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Sending FindCoordinator request to broker localhost:9092 (id: -1 rack: null)
17:34:38.944 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.ClientUtils -- Resolved host localhost as 127.0.0.1
17:34:38.944 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.ClusterConnectionStates -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Resolved host localhost to addresses [localhost/127.0.0.1]
17:34:38.945 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1
17:34:38.950 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.common.network.Selector -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
17:34:38.951 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Completed connection to node -1. Fetching API versions.
17:34:38.951 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Initiating API versions fetch from node -1.
17:34:38.951 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, correlationId=1, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.7.0')
17:34:38.957 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, correlationId=1, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=15), ApiVersion(apiKey=2, minVersion=0, maxVersion=8), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=4), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=55, minVersion=0, maxVersion=1), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=64, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[SupportedFeatureKey(name='metadata.version', minVersion=1, maxVersion=14)], finalizedFeaturesEpoch=5580, finalizedFeatures=[FinalizedFeatureKey(name='metadata.version', maxVersionLevel=14, minVersionLevel=14)], zkMigrationReady=false)
17:34:38.961 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Node -1 has finalized features epoch: 5580, finalized features: [FinalizedFeatureKey(name='metadata.version', maxVersionLevel=14, minVersionLevel=14)], supported features: [SupportedFeatureKey(name='metadata.version', minVersion=1, maxVersion=14)], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 15 [usable: 15], ListOffsets(2): 0 to 8 [usable: 8], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): UNSUPPORTED, OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 4 [usable: 4], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): 0 to 1 [usable: 1], AlterPartition(56): UNSUPPORTED, UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): UNSUPPORTED, DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): 0 [usable: 0], DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): UNSUPPORTED, ConsumerGroupHeartbeat(68): UNSUPPORTED, ConsumerGroupDescribe(69): UNSUPPORTED, GetTelemetrySubscriptions(71): UNSUPPORTED, PushTelemetry(72): UNSUPPORTED, ListClientMetricsResources(74): UNSUPPORTED).
17:34:38.961 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='words')], allowAutoTopicCreation=false, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null)
17:34:38.961 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, correlationId=2, headerVersion=2) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='words')], allowAutoTopicCreation=false, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
17:34:38.962 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Give up sending telemetry request since no node is available
17:34:38.962 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, correlationId=0, headerVersion=2) and timeout 30000 to node -1: FindCoordinatorRequestData(key='', keyType=0, coordinatorKeys=[my-ktable-demo])
17:34:38.963 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Give up sending telemetry request since no node is available
17:34:38.963 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -- Creating telemetry subscription request with client instance id AAAAAAAAAAAAAAAAAAAAAA
17:34:38.963 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -- Setting telemetry state from SUBSCRIPTION_NEEDED to SUBSCRIPTION_IN_PROGRESS
17:34:38.963 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient -- [Consumer clientId=my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1-consumer, groupId=my-ktable-demo] Version mismatch when attempting to send GetTelemetrySubscriptionsRequestData(clientInstanceId=AAAAAAAAAAAAAAAAAAAAAA) with correlation id 3 to -1
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS
17:34:38.963 [my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-StreamThread-1] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -- The broker generated an error for the get telemetry network API request
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS
我不知道是什么造成了问题。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>09-ktable-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.15.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.4.14</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
KTableDemo.java
@Slf4j
public class KTableDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-ktable-demo"); // consumer group
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, String> wordsTable = streamsBuilder
.table(WORDS, Consumed.with(Serdes.String(), Serdes.String()), Materialized.as("words-store"));
wordsTable.filter((key, value) -> value.length() > 2)
.toStream()
.print(Printed.<String, String>toSysOut().withLabel("words-table"));
KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(), props);
myStream.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Stopping Stream");
myStream.close();
}));
}
}
我有同样的问题,解决方案是添加
enable.metrics.push = false
设置。
在代码中这很可能是这样的
props.put(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG,“假”)