我想在 kafka 流和 ktable 之间进行连接。 poc 可以很好地处理流数据。但是,当我使用 CloudEvent 时,我不断遇到与序列化相关的一些或其他问题。
这是我的代码示例 -
Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);
CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);
CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
Serde<CloudEvent> cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
KStream<String, CloudEvent> kStream = builder.stream("stream-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KTable<String, CloudEvent> kTable = builder.table("ktable-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KStream<String, CloudEvent> joined = kStream
.join(kTable, (left, right) -> CloudEventBuilder.v1().withId(left.getId().concat(right.getId())).build());
joined.to(output, Produced.with(Serdes.String(), eventsSerde));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamProps);
kafkaStreams.start();
我还尝试使用 WrapperSerde - 为 Kafka Streams 配置 Serdes 的问题
但是我不断遇到异常 -
18:12:08.691 [基本流-更新-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] 错误 org.apache.kafka.streams.KafkaStreams - 流客户端 [基本流更新-0630c691-0080-4e02-8c85-7bff650f34e9] 处理过程中遇到如下异常 注册的异常处理程序选择 SHUTDOWN_CLIENT。溪流 客户端现在要关闭。 org.apache.kafka.streams.errors.StreamsException:异常捕获 过程。任务 ID=0_0,处理器=KSTREAM-SOURCE-0000000002, 主题=cloudevent-ktable,分区=0,偏移=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer仅支持签名serialize(String, 标头、CloudEvent)
原因:java.lang.UnsupportedOperationException: CloudEventSerializer仅支持签名serialize(String, 标题、CloudEvent)位于 io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] 在 io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] 在 org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]
有人成功使用 CloudEvent 和 ktable 吗?
你的堆栈跟踪是这样说的:
Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at
本质上就是这段代码:
final byte[] rawValue = valueSerializer.serialize(topic, data);
因此,您使用的
CloudEventSerializer
不适合Kafka Streams:
public byte[] serialize(String topic, CloudEvent data) {
throw new UnsupportedOperationException("CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)");
}
这是因为 Kafka Streams 不支持序列化(反)序列化标头。
我建议您扩展该
CloudEventSerializer
并覆盖其 serialize(String topic, CloudEvent data)
以委托给具有空 serialize(String topic, Headers headers, CloudEvent data)
的 new RecordHeaders()
。