我无法理解,为什么我总是在我的 Spring Cloud Stream Kafka 拓扑中收到
TimeoutException
。
我的 Spring Cloud Stream 函数有以下配置:
spring:
application:
name: foo
cloud:
function:
definition: transformKey;mapData
stream:
bindings:
transformKey-in-0:
destination: incoming-topic
transformKey-out-0:
destination: repartitioned-topic
mapData-in-0:
destination: repartitioned-topic
mapData-in-1:
destination: joining-topic
mapData-out-0:
destination: converted-outcome-topic
kafka:
streams:
binder:
min-partition-count: 60
auto-add-partitions: true
required-acks: all
producer-properties:
retries: 2
functions:
transformKey:
applicationId: transform-key-appid
mapData:
applicationId: mapdata-appid
bindings:
mapData-in-1:
consumer:
materializedAs: joining-store
transformKey-out-0:
producer:
sync: true
目前我的重点是第一次迭代,我只是转换密钥,以便信息重新分区。键很小(只有几个字节),值可以有更多字节:1-4 KiB。
不幸的是我收到以下错误:
Expiring 3 record(s) for repartitioned-topic-55:120000 ms has passed since batch creation
我的预期是,当生产信息花费很长时间时,整个功能会运行得更慢。但事实并非如此。
transformKey-in-0
上的数据消耗仍在处理中,尽管无法尽快清除该批次。我无法想象,对于这种情况没有可靠的解决方案。或者我在这个过程中有误会。
有没有人有想法,如何解决这个问题?