我试图使用Spring Cloud Stream创建一个Kafka流应用,但在反序列化输入消息时遇到了困难,因为这些消息的值已经用 "Spring Cloud Stream "编码了。信息包. 如果有人能帮忙,我真的很感激。
这是我目前的情况。
// TransactionApplication.java
@SpringBootApplication
public class TransactionApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionApplication.class, args);
}
public static class TransactionConsumer {
@Bean
public Serde<Transaction> transactionSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
return new JsonSerde<Transaction>(mapper);
}
@Bean
public Consumer<KStream<String, Transaction>> process() {
return input -> input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
}
// Transaction.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
String item;
Number amount;
}
我得到的错误是: java.lang.IllegalStateException: 头文件中没有类型信息,也没有提供默认类型。
我的application.yml是。
spring.cloud.stream:
bindings:
process-in-0:
destination: transactions
kafka:
streams:
binder:
applicationId: transactions-application
configuration:
commit.interval.ms: 100
在包含了... spring.json.value.default.type: com.example.Transaction
在...之下 configuration
节点,我得到了另一个错误。请看下面。
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]
你需要向反序列化器提供一个提示,告诉它要从编码的有效载荷中创建什么对象。
如果记录是由Spring JsonSerializer创建的,那么提示就在头文件中。
如果不是,你必须在流配置中提供提示。
你需要在流配置中显示你的 application.yml/properties
.