当我在函数中处理的消息不遵循某些逻辑时,我正在尝试向 DLQ 发送消息。
对于某些上下文,我在 application.yaml 中有此配置 我的函数如下所示:
function:
definition: filterConsumption|EnrichConsumption;
#Functions and topic binding
stream:
bindings:
filterConsumptionEnrichConsumption-in-0:
destination: input
filterConsumptionEnrichConsumption-out-0:
destination: output
kafka:
streams:
bindings:
filterConsumptionEnrichConsumption-in-0:
consumer:
enable-dlq: true
dlqName: input_dlq
application-id: input-application-id
filterConsumptionEnrichConsumption-out-1:
consumer:
enable-dlq: false
application-id: output-application-id
binder:
#Kafka consumer config
replicationFactor: ${KAFKA_STREAM_REPLICATION_FACTOR:1}
brokers: ${BOOTSTRAP_SERVERS_CONFIG:localhost:9092}
deserialization-exception-handler: sendToDlq
我的函数如下所示:
@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
return input ->
input.filter((key, consumptions) -> !getSomething(consumptions).orElse("").isBlank())
.merge(
//filter consumptions having a tradingName
input.filter((key, consumptions) -> getSomething(consumptions).orElse("").isBlank())
//enrich consumptions with missing tradingName
.mapValues(this::setSomething)
);
}
在“setSomething”过程中,由于逻辑规则,可能会出现一些异常。
我尝试了两件事: 第一次使用 StreamBridge,但我不断收到以下错误:
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
我尝试这样配置streamBridge:
private final StreamBridge streamBridge;
@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
return input ->
input.filter((key, consumptions) ->
!getSomething(consumptions).orElse("").isBlank())
.merge(
//filter consumptions having a tradingName
input.filter((key, consumptions) ->
getSomething(consumptions).orElse("").isBlank())
//enrich consumptions with missing tradingName
.mapValues((ConsumptionSchema value) -> {
try {
return setSomething(value);
} catch (DlqException e) {
Message<ConsumptionSchema> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("filterConsumptionEnrichConsumption-in-1", mess);
return null;
}
})
);
}
我还尝试使用 SendToDlqAndContinue,使用这样的处理器:
@Autowired
private final SendToDlqAndContinue dlqHandler;
@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
return input ->
input.process(() -> new Processor<String, ConsumptionSchema, String, ConsumptionSchema>() {
ProcessorContextImpl context;
@Override
public void init(ProcessorContext context) {
this.context = (ProcessorContextImpl) context;
}
@Override
public void process(Record<String, ConsumptionSchema> processInput) {
input.filter((key, consumptions) ->
!getSomething(consumptions).orElse("").isBlank())
.merge(
//filter consumptions having a tradingName
input.filter((key, consumptions) ->
getSomething(consumptions).orElse("").isBlank())
//enrich consumptions with missing tradingName
.mapValues((ConsumptionSchema value) -> {
try {
return setSomething(value);
} catch (DlqException e) {
log.error("Exception during handling of consumption message : {}, message : {}",
processInput.key(), e.getMessage());
dlqHandler.sendToDlq(
new ConsumerRecord<>(
context.topic(),
context.partition(),
context.offset(),
processInput.key(),
processInput.value()), e);
return null;
}
})
);
}
在这种情况下,我不明白为什么,似乎没有调用 process 方法。
任何人都可以帮助我使用 SendToDlqAndContinue (首选解决方案)或 StreamBridge 使其正常工作?
编辑:
使用与第一部分相同的 application.yaml,我尝试了 DltAwareProcessor :
@Configuration
@Data
@Slf4j
public class BillableConsumptionFilterStreaming {
@Bean("filterConsumption")
public Function<KStream<String, ConsumptionSchema>,
KStream<String, ConsumptionSchema>> filterConsumption(DltSenderContext dltSenderContext) {
return input ->
input.process(() ->
new DltAwareProcessor<>(
(BiFunction<String, ConsumptionSchema, KeyValue<String, ConsumptionSchema>>) (s, c) -> {
throw new RuntimeException("Exception that won't kill stream");
}, "input_dlq", dltSenderContext));
}
使用断点,正确调用 DltAwareProcessor,直到这一行:streamBridge.send(this.dltDestination, r.value());
没有抛出异常或任何异常,但我收到以下日志:
Using kafka topic for outbound: input_dlq
Node -1 disconnected.
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
在我的应用程序中,BOOTSTRAP_SERVERS_CONFIG被我们的kafka的地址覆盖,并且当没有异常时,消息被正确路由到输出主题。所以也许我在 application.yaml 中缺少一些配置来为 StreamBridge 配置代理。
SendToDlqAndContinue
是为了反序列化异常处理目的而显式构建的。您不能像您尝试的那样将其用于运行时错误处理。我们最近(在 4.1.0-SNAPSHOT
行中)推出了一项新功能,可能有助于此用例。请参阅以下问题以了解更多详细信息。
https://github.com/spring-cloud/spring-cloud-stream/issues/2779
https://github.com/spring-cloud/spring-cloud-stream/issues/2802
看看这是否适合您的用例。如果您发现此功能有任何改进空间,请对上面的问题 (2802) 发表评论,我们仍然可以做到这一点,因为我们将在本月晚些时候发布
4.1.0-RC1
版本。