Kafka Streams - 为什么我不能聚合和总结我的多头?

问题描述 投票:0回答:1

我是 Kafka Streams 新手,我正在尝试拼凑我的第一个应用程序。

我想添加我的银行交易金额。

@Bean
    public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
        JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
        jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);
        return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
        .mapValues((readOnlyKey, value) -> value.amount)
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
                        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()))
                .toStream();

    }

从主题中读取事件并正确反序列化。

但是应用程序崩溃并出现以下错误:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=transactions, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
    at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
    at org.apache.kafka.common.serialization.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:92)
    at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.get(KeyValueStoreWrapper.java:76)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)

我不明白当只放入 Long 时,LongDeserializer 如何获取单字节 KeyValueStore。

任何帮助将不胜感激。

我尝试过调试代码,但我所能看到的是

transactionAmountCount
的变更日志正在被读取,这就是一切都崩溃的地方。

java spring spring-kafka apache-kafka-streams
1个回答
0
投票

从错误来看,解串器值似乎是为整数定义的(4 个字节link)。

org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

如果您查看日志的其余部分,您可以看到它指示正在使用

IntegerDeserializer

确保您使用与您的数据匹配的正确解串器,并且已正确配置键和值。

© www.soinside.com 2019 - 2024. All rights reserved.