我的配置如下:
spring:
application:
name: transaction-enricher-application
integration:
poller:
fixed-delay: 5000
cloud:
stream:
kafka:
binder:
brokers: broker:9092 # Switch here to local instance when running on localhost
bindings:
enrichTransaction-in-0:
consumer:
batch-mode: true
configuration:
value:
deserializer: com.example.demo.serdes.TransactionDeserializer
destination: approvalRequest-out-0
enrichTransaction-out-0:
producer:
useNativeEncoding: true
configuration:
value:
serializer: com.example.demo.serdes.EnrichedTransactionSerializer
这是我的服务类,它接收并发布消息:
package com.example.demo.enricher;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import com.example.demo.service.EnrichmentService;
import java.util.function.Function;
import com.example.demo.domain.EnrichedTransaction;
import com.example.demo.domain.Transaction;
@Configuration
public class CashCardTransactionEnricher {
@Bean
EnrichmentService enrichmentService() {
return new EnrichmentService();
}
@Bean
public Function<Transaction, EnrichedTransaction> enrichTransaction(EnrichmentService enrichmentService) {
return transaction -> {
return enrichmentService.enrichTransaction(transaction);
};
}
}
我的序列化器是这样的:
package com.example.demo.serdes;
import com.example.demo.domain.EnrichedTransaction;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
public class EnrichedTransactionSerializer implements Serializer<EnrichedTransaction> {
private JsonSerializer<EnrichedTransaction> jsonSerializer = new JsonSerializer<>();
@Override
public byte[] serialize(String topic, EnrichedTransaction data) {
return jsonSerializer.serialize(topic, data);
}
}
我的解串器是这样的:
package com.example.demo.serdes;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.demo.domain.Transaction;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
import java.io.IOException;
import com.example.demo.domain.CashCard;
public class TransactionDeserializer implements Deserializer<Transaction> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public Transaction deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(data, Transaction.class);
} catch (JsonProcessingException e) {
System.out.println("JsonProcessingException occured while deserializing Transaction" + e.getMessage());
} catch (Exception e) {
System.out.println("Error deserializing Transaction: " + e.getMessage());
return new Transaction(0L, new CashCard(0L, "Error", 0.0));
}
return new Transaction(1L, new CashCard(1L, "Test Owner", 3.14)); // just to avoid any errors
}
@Override
public void close() {}
}
最后我用于批准交易的丰富服务如下:
package com.example.demo.service;
import java.util.UUID;
import com.example.demo.domain.ApprovalStatus;
import com.example.demo.domain.CardHolderData;
import com.example.demo.domain.EnrichedTransaction;
import com.example.demo.domain.Transaction;
public class EnrichmentService {
public EnrichedTransaction enrichTransaction(Transaction transaction) {
return new EnrichedTransaction(
transaction.id(),
transaction.cashCard(),
ApprovalStatus.APPROVED,
new CardHolderData(
UUID.randomUUID(),
transaction.cashCard().owner(),
"123 Main street"
)
);
}
}
我收到的错误是这样的:
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class reactor.core.publisher.FluxPeekFuseable to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
我真的不知道这个问题的根本原因是什么
我正确使用消息并接收它,
但是当我修改后发布回来时,我遇到了 SerializationException
我在这里设置了一个存储库:
https://github.com/manonworldrepository/spring-cloud-stream-cashcard-application
./gradlew bootBuildImage
docker compose up -d --build
docker logs enricher
然后你就能看到异常了
非常感谢您的帮助
您好,失败的原因是绑定器属性的顺序错误,有关更多详细信息,您可以查看 Spring 文档,您必须查看
kafka.bindings
和 "common" bindings
@Configuration
public class CashCardTransactionEnricher {
@Bean
EnrichmentService enrichmentService() {
return new EnrichmentService();
}
@Bean
public Function<Flux<List<Transaction>>, Flux<EnrichedTransaction>> enrichTransaction(EnrichmentService enrichmentService) {
return transaction ->
transaction.flatMap(Flux::fromIterable)
.map(enrichmentService::enrichTransaction);
}
配置:
spring:
application:
name: transaction-enricher-application
integration:
poller:
fixed-delay: 5000
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 # Switch here to local instance when running on localhost
bindings:
enrichTransaction-in-0:
consumer:
configuration:
value.deserializer: com.example.demo.serdes.TransactionDeserializer
enrichTransaction-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: com.example.demo.serdes.FluxSerializer
bindings:
enrichTransaction-in-0:
binder: kafka
group: enrich-consumer
consumer:
use-native-encoding: true
batch-mode: true
destination: approval-requests
enrichTransaction-out-0:
binder: kafka
destination: enriched-transactions
producer:
use-native-encoding: true
spring:
application:
name: transaction-sink-application
integration:
poller:
fixed-delay: 5000
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 # Switch here to local instance when running on localhost
bindings:
sinkToConsole-in-0:
destination: enriched-transactions
group: console-consumer
cashCardTransactionFileSink-in-0:
destination: enriched-transactions
group: file-sink-consumer
function:
definition: sinkToConsole;cashCardTransactionFileSink
spring:
application:
name: transaction-source-application
integration:
poller:
fixed-delay: 5000
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 # Switch here to local instance when running on localhost
bindings:
approvalRequest-out-0:
destination: approval-requests
binder: kafka