我们正在使用 Kafka Streams 进行一些流处理,我们在可观测系统中看到一些奇怪的日志。
一切正常,处理工作没有任何问题,但 INFO 日志有点令人担忧。
这是我们的流定义
@Bean
public KTable < Long, String > kStream(StreamsBuilder streamsBuilder) {
KStream < Long, String > stream1 = streamsBuilder.stream(topic1);
KStream < Long, String > stream2 = streamsBuilder.stream(topic2);
stream1.merge(stream2)
.selectKey((userId, v) - > userId)
.groupByKey()
.aggregate(
CustomObject::new,
(aggKey, newValue, aggValue) - > aggValue.aggregate(newValue),
Materialized
. < Long, OutputObject, KeyValueStore < Bytes, byte[] >> as(storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(outputCustomSerde)
.withStoreType(Materialized.StoreType.IN_MEMORY)
.withCachingDisabled()
)
.toStream()
.to(topic3);
return null;
}
基于流定义 - 创建 2 个 kafka 主题 - 一个
changelog
和一个 repartition
。
以下是我们得到的 3 条信息日志:
[AdminClient clientId=test-kafka-streams-v2-ade9efd5-bf02-4f98-baf7-97db2bfcb2ef-admin] Cancelled in-flight DELETE_RECORDS request with correlation id 1780227 due to node 1 being disconnected (elapsed time since creation: 3ms, elapsed time since send: 3ms, request timeout: 2ms)
[AdminClient clientId=test-kafka-streams-v2-ade9efd5-bf02-4f98-baf7-97db2bfcb2ef-admin] Cancelled in-flight METADATA request with correlation id 1845715 due to node 1 being disconnected (elapsed time since creation: 19ms, elapsed time since send: 19ms, request timeout: 18ms)
[AminClient clientId=test-kafka-streams-v2-ade9efd5-bf02-4f98-baf7-97db2bfcb2ef-admin] Disconnecting from node 1 due to request timeout.
我们的问题是 - 我们应该担心这 3 个日志吗?
如果是 - 我们能做什么?
如果不是——我们应该以某种方式抑制它还是忽略它?
非常感谢
我们应该担心那 3 个日志吗
不。管理客户端只需重新连接并重试。
我们应该以某种方式压制它还是忽略它?
我会忽略它。当然,您也可以更改日志级别。