我是 Kafka Streams 新手,我正在尝试拼凑我的第一个应用程序。
我想添加我的银行交易金额。
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);
return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
.mapValues((readOnlyKey, value) -> value.amount)
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream();
}
从主题中读取事件并正确反序列化。
但是应用程序崩溃并出现以下错误:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=transactions, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:92)
at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.get(KeyValueStoreWrapper.java:76)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
我不明白当只放入 Long 时,LongDeserializer 如何获取单字节 KeyValueStore。
任何帮助将不胜感激。
我尝试过调试代码,但我所能看到的是
transactionAmountCount
的变更日志正在被读取,这就是一切都崩溃的地方。
从错误来看,解串器值似乎是为整数定义的(4 个字节link)。
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
如果您查看日志的其余部分,您可以看到它指示正在使用
IntegerDeserializer
。
确保您使用与您的数据匹配的正确解串器,并且已正确配置键和值。