我正在尝试使用春季云流kafka流阅读kafka。然后将事件汇总到一个分钟的时间窗口中,然后将其归类到其他主题。然后,我需要从该主题中读取聚合事件并将其写入另一个主题,同时将该主题与另一个kafka集群中的另一个主题绑定。但是我得到下面的活页夹例外。
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: The binder 'kafkaha' cannot bind a com.sun.proxy.$Proxy155
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.expediagroup.platform.StreamingApplication.main(StreamingApplication.java:11)
Caused by: java.lang.IllegalStateException: The binder 'kafkaha' cannot bind a com.sun.proxy.$Proxy155
at org.springframework.util.Assert.state(Assert.java:73)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:194)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:130)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:337)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:229)
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 14 common frames omitted
我遵循了link中的示例,并尝试了以下代码。
application.properties
spring:
applicaiton.name: eg-destination-attribute-store-ha-search-stream
cloud:
consul:
host: localhost
port: 8500
discovery:
instanceId: eg-destination-attribute-store-ha-search-stream
stream:
kafka:
streams:
timeWindow:
length: 60000
advanceBy: 60000
bindings:
inputKstream:
consumer:
autoCommitOffset: true
startOffset: earliest
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bridge:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
binder:
brokers: kafka.us-east-1.stage.kafka.away.black:9092
configuration:
schema.registry.url: http://kafka-schema-registry.us-east-1.stage.kafka.away.black:8081
commit.interval.ms: 1000
application.id: eg-test-dev #its a random id to be identified uniquly
autoAddPartitions: false
minPartitionCount: 1
num:
stream:
threads: 1
bindings:
inputKstream:
destination: business-events-search-event
binder: kafkaha
group: grp-eg-destination-attribute-store-ha-search-stream-ha
consumer:
useNativeDecoding: true
bridge:
destination: business-events-search-event-agg
binder: kafkaha
#group: grp-eg-destination-attribute-store-ha-search-stream
consumer:
useNativeDecoding: true
output:
destination: business-events-search-event-agg
binder: kafkaha
group: grp-eg-destination-attribute-store-ha-search-stream-eg-in
consumer:
useNativeDecoding: true
input:
destination: business-events-search-event-eg
binder: kafkaeg
group: grp-eg-destination-attribute-store-ha-search-stream-eg
consumer:
useNativeDecoding: true
binders:
kafkaha:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka.us-east-1.stage.kafka.away.black:9092
kafkaeg:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
ExecutorHaAgg.java
@Slf4j
@EnableBinding(EgSrcSinkProcessor.class)
public class ExecutorHaAgg {
@Value("${spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url}")
private String schemaRegistryUrl;
@Autowired
private LookNPersistService service;
@Autowired
private TimeWindows timeWindows;
@Timed(value = "kstream.BusinessModelMaskActiveLogV2.process.time", percentiles = {0.5, 0.9, 0.99}, histogram = true)
@StreamListener
@SendTo("bridge")
public KStream<Windowed<String>, ResultValue> process(@Input("inputKstream") KStream<String, SearchBusinessEvent> inputKstream) {
final Map<String, String> schemaMap = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final SpecificAvroSerde<SearchBusinessEvent> searchBussinessEventSerde = new SpecificAvroSerde<>();
searchBussinessEventSerde.configure(schemaMap, false);
TransformedValueSerde transformedValueSerde = new TransformedValueSerde();
ResultValueSerde resultValueSerde = new ResultValueSerde();
return inputKstream
.filter((k, v) -> (v.getVisitorUuid() != null && v.getSearchTermUUIDs() != null && v.getSearchTermUUIDs().size() > 0))
.map((k, v) -> KeyValue.pair(StringUtil.getSearchTermFromUri(v.getSearchTermUUIDs().get(0)), new TransformedValue(v.getAvailabilityStart(), v.getAvailabilityEnd(), v.getHeader().getTime())))
.groupBy((k, v) -> k, Serialized.with(Serdes.String(), transformedValueSerde))
.windowedBy(timeWindows)
.aggregate(ResultValue::new, ((key, value, aggregate) -> {
aggregate.setSearchTerm(key);
aggregate.setTime((aggregate.getTime() < value.getTime()) ? value.getTime() : aggregate.getTime());
aggregate.setDatedCount(StringUtil.isDatedStrNullAndEmpty(value.getStartDate(), value.getEndDate()) ? aggregate.getDatedCount() : 1 + aggregate.getDatedCount());
aggregate.setCount(1 + aggregate.getCount());
return aggregate;
}), Materialized.with(Serdes.String(), resultValueSerde)).toStream();
}
}
Transporter.java
@Slf4j
@EnableBinding(Processor.class)
public class Transporter {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object transfer(Object object){
return object;
}
}
EgSrcSinkProcessor.java
public interface EgSrcSinkProcessor {
@Input("inputKstream")
KStream<?, ?> inputKstream();
@Output("bridge")
KStream<?, ?> bridgeKstream();
}