我正试图用kafka流实现一个简单的计数器。它接受一个键值,如果相同的键值来了,就添加新的值。
这是我目前写的代码
package exercises;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
public class LiveCounter {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "live-counter-2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLine = builder.stream("counter-in");
KStream<String, Long> words = textLine
.filter((k, v) -> {
try {
String[] arr = v.split(" ");
Long.parseLong(arr[1]);
return true;
} catch (NumberFormatException e) {
return false;
}
})
.selectKey((k, v) -> v.split(" ")[0])
.mapValues((k, v) -> Long.parseLong(v.split(" ")[1]))
.groupByKey()
.reduce(Long::sum)
.toStream();
words.to("counter-out", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
System.out.println(streams.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
我们可以看到,拓扑结构很简单。它接受一个文本行并改变键,然后最后用reduce函数添加键组的所有值。
我试过流到mapValues为止,它是工作的。但是这个代码块造成了问题
.groupByKey()
.reduce(Long::sum)
.toStream();
这是堆栈跟踪
live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.errors.LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: counter-in, partition: 0, offset: 1
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Shutting down
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from RUNNING to ERROR
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] All stream threads have died. The instance will be in error state and should be closed.
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Shutdown complete
Exception in thread "live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from ERROR to PENDING_SHUTDOWN
[kafka-streams-close-thread] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Informed to shut down
[kafka-streams-close-thread] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from PENDING_SHUTDOWN to NOT_RUNNING
[Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] Streams client stopped completely
任何帮助将被感激。先谢谢你
既然你的输入题目。counter-in
,由以下信息组成 String
s,而你的默认值序列器是 LongSerde
你需要告诉Kafka Streams将值反序列化为一个字符串,如下所示。
KStream<String, String> textLine = builder.stream("counter-in", Consumed.with(Serdes.String(), Serdes.String()));