在下面的代码中,我尝试通过 REST 端点调用
processOrder()
来创建消息。然后,我想将 processOrder()
的结果传递给 processShipping()
和 processPayment
。
但是,每当我调用其余端点
http://localhost:8080/processOrder
时,只会调用 processOrder()
。这是怎么回事?
package com.example.kafkademo.functions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class MessageFunctions {
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
return orderId + " : " + System.currentTimeMillis();
};
}
@Bean
public Consumer<String> processShipping(){
return orderId -> {
System.out.println("processShipping: " + orderId);
System.out.println(orderId);
};
}
@Bean
public Consumer<String> processPayment(){
return orderId -> {
System.out.println("processPayment: " + orderId);
System.out.println(orderId);
};
}
}
这是
application.yml
:
spring:
application:
name: kafka-demo
cloud:
function:
definition: processOrder;processPayment;processShipping
stream:
bindings:
processOrder-out-0:
destination: order_topic
processPayment-in-0:
destination: order_topic
processShipping-in-0:
destination: order_topic
kafka:
listener:
port: 9094
bootstrap-servers:
- localhost:9094
以防万一,以下是依赖项:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
似乎您通过 REST 端点触发了第一个函数 -
processOrder
,因此该函数的响应将返回到 HTTP 调用。我的猜测是数据没有发布到Kafka主题。
您可以尝试更改您的功能
processOrder
以使用StreamBridge
,如下所示:
@Autowired StreamBridge streamBridge;
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
streamBridge.send("order_topic", orderId + " : " + System.currentTimeMillis());
return orderId + " : " + System.currentTimeMillis();
};
}
}
这样,函数就显式地将数据发送到出站。您也可以使用
Consumer
代替 Function
,但话又说回来,您需要使用 StreamBridge
。