Spring-Cloud-Stream serde SerializationException 会阻止消息在被消费和修改后被发布

问题描述 投票:0回答:1

我的配置如下:

spring:
  application:
    name: transaction-enricher-application
  integration:
    poller:
      fixed-delay: 5000
  cloud:
    stream:
      kafka:
        binder:
          brokers: broker:9092 # Switch here to local instance when running on localhost
      bindings:
        enrichTransaction-in-0:
          consumer:
            batch-mode: true
            configuration:
              value:
                deserializer: com.example.demo.serdes.TransactionDeserializer
          destination: approvalRequest-out-0
        enrichTransaction-out-0:
          producer:
            useNativeEncoding: true
            configuration:
              value:
                serializer: com.example.demo.serdes.EnrichedTransactionSerializer

这是我的服务类,它接收并发布消息:

package com.example.demo.enricher;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import com.example.demo.service.EnrichmentService;
import java.util.function.Function;
import com.example.demo.domain.EnrichedTransaction;
import com.example.demo.domain.Transaction;

@Configuration
public class CashCardTransactionEnricher {

    @Bean
    EnrichmentService enrichmentService() {
        return new EnrichmentService();
    }

    @Bean
    public Function<Transaction, EnrichedTransaction> enrichTransaction(EnrichmentService enrichmentService) {
        return transaction -> {
            return enrichmentService.enrichTransaction(transaction);
        };
    }

}

我的序列化器是这样的:

package com.example.demo.serdes;

import com.example.demo.domain.EnrichedTransaction;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class EnrichedTransactionSerializer implements Serializer<EnrichedTransaction> {
    private JsonSerializer<EnrichedTransaction> jsonSerializer = new JsonSerializer<>();

    @Override
    public byte[] serialize(String topic, EnrichedTransaction data) {
        return jsonSerializer.serialize(topic, data);
    }
}

我的解串器是这样的:

package com.example.demo.serdes;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.demo.domain.Transaction;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;
import java.io.IOException;
import com.example.demo.domain.CashCard;

public class TransactionDeserializer implements Deserializer<Transaction> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public Transaction deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }

        ObjectMapper objectMapper = new ObjectMapper();

        try {
            return objectMapper.readValue(data, Transaction.class);
        } catch (JsonProcessingException e) {
            System.out.println("JsonProcessingException occured while deserializing Transaction" + e.getMessage());
        } catch (Exception e) {
            System.out.println("Error deserializing Transaction: " + e.getMessage());
            return new Transaction(0L, new CashCard(0L, "Error", 0.0));
        }

        return new Transaction(1L, new CashCard(1L, "Test Owner", 3.14)); // just to avoid any errors
    }

    @Override
    public void close() {}
}

最后我用于批准交易的丰富服务如下:

package com.example.demo.service;

import java.util.UUID;

import com.example.demo.domain.ApprovalStatus;
import com.example.demo.domain.CardHolderData;
import com.example.demo.domain.EnrichedTransaction;
import com.example.demo.domain.Transaction;

public class EnrichmentService {
    public EnrichedTransaction enrichTransaction(Transaction transaction) {
        return new EnrichedTransaction(
            transaction.id(),
            transaction.cashCard(),
            ApprovalStatus.APPROVED,
            new CardHolderData(
                UUID.randomUUID(),
                transaction.cashCard().owner(),
                "123 Main street"
            )
        );
    }
}

我收到的错误是这样的:

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class reactor.core.publisher.FluxPeekFuseable to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

我真的不知道这个问题的根本原因是什么

我正确使用消息并接收它,

但是当我修改后发布回来时,我遇到了 SerializationException

我在这里设置了一个存储库:

https://github.com/manonworldrepository/spring-cloud-stream-cashcard-application

  1. ./gradlew bootBuildImage
  2. docker compose up -d --build
  3. docker logs enricher

然后你就能看到异常了

非常感谢您的帮助

spring spring-boot serialization spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

您好,失败的原因是绑定器属性的顺序错误,有关更多详细信息,您可以查看 Spring 文档,您必须查看

kafka.bindings
"common" bindings

以下是需要更改的内容:

@Configuration
public class CashCardTransactionEnricher {

    @Bean
    EnrichmentService enrichmentService() {
        return new EnrichmentService();
    }

    @Bean
    public Function<Flux<List<Transaction>>, Flux<EnrichedTransaction>> enrichTransaction(EnrichmentService enrichmentService) {
        return transaction ->
                transaction.flatMap(Flux::fromIterable)
                        .map(enrichmentService::enrichTransaction);

    }

配置:

spring:
  application:
    name: transaction-enricher-application
  integration:
    poller:
      fixed-delay: 5000
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092 # Switch here to local instance when running on localhost
        bindings:
          enrichTransaction-in-0:
            consumer:
              configuration:
                value.deserializer: com.example.demo.serdes.TransactionDeserializer
          enrichTransaction-out-0:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: com.example.demo.serdes.FluxSerializer
      bindings:
        enrichTransaction-in-0:
          binder: kafka
          group: enrich-consumer
          consumer:
            use-native-encoding: true
            batch-mode: true
          destination: approval-requests
        enrichTransaction-out-0:
          binder: kafka
          destination: enriched-transactions
          producer:
            use-native-encoding: true

spring:
  application:
    name: transaction-sink-application
  integration:
    poller:
      fixed-delay: 5000
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092 # Switch here to local instance when running on localhost
      bindings:
        sinkToConsole-in-0:
          destination: enriched-transactions
          group: console-consumer
        cashCardTransactionFileSink-in-0:
          destination: enriched-transactions
          group: file-sink-consumer
    function:
      definition: sinkToConsole;cashCardTransactionFileSink

spring:
  application:
    name: transaction-source-application
  integration:
    poller:
      fixed-delay: 5000
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092 # Switch here to local instance when running on localhost
      bindings:
        approvalRequest-out-0:
          destination: approval-requests
          binder: kafka
© www.soinside.com 2019 - 2024. All rights reserved.