无法使用Spring Cloud Kafka Binder处理消息

问题描述 投票:0回答:1

在下面的代码中,我尝试通过 REST 端点调用

processOrder()
来创建消息。然后,我想将
processOrder()
的结果传递给
processShipping()
processPayment

但是,每当我调用其余端点

http://localhost:8080/processOrder
时,只会调用
processOrder()
。这是怎么回事?

package com.example.kafkademo.functions;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Consumer;
import java.util.function.Function;

@Configuration
public class MessageFunctions {

    @Bean
    public Function<String, String> processOrder(){
        return orderId -> {
            System.out.println("processOrder: " + orderId);
            System.out.println(orderId);
            return orderId + " : " + System.currentTimeMillis();
        };
    }

    @Bean
    public Consumer<String> processShipping(){
        return orderId -> {
            System.out.println("processShipping: " + orderId);
            System.out.println(orderId);
        };
    }

    @Bean
    public Consumer<String> processPayment(){
        return orderId -> {
            System.out.println("processPayment: " + orderId);
            System.out.println(orderId);
        };
    }
}

这是

application.yml

spring:
  application:
      name: kafka-demo

  cloud:
    function:
      definition: processOrder;processPayment;processShipping
    stream:
      bindings:
        processOrder-out-0:
          destination: order_topic
        processPayment-in-0:
          destination: order_topic
        processShipping-in-0:
          destination: order_topic

  kafka:
    listener:
      port: 9094
    bootstrap-servers:
      - localhost:9094

以防万一,以下是依赖项:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.cloud:spring-cloud-starter-function-web' 
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
java spring apache-kafka spring-cloud-stream
1个回答
0
投票

似乎您通过 REST 端点触发了第一个函数 -

processOrder
,因此该函数的响应将返回到 HTTP 调用。我的猜测是数据没有发布到Kafka主题。

您可以尝试更改您的功能

processOrder
以使用
StreamBridge
,如下所示:

@Autowired StreamBridge streamBridge;

@Bean
public Function<String, String> processOrder(){
        return orderId -> {
            System.out.println("processOrder: " + orderId);
            System.out.println(orderId);

            streamBridge.send("order_topic",  orderId + " : " + System.currentTimeMillis());

            return orderId + " : " + System.currentTimeMillis();
        };
    }

}

这样,函数就显式地将数据发送到出站。您也可以使用

Consumer
代替
Function
,但话又说回来,您需要使用
StreamBridge

© www.soinside.com 2019 - 2024. All rights reserved.