我正在尝试实现一个弹簧启动aws kinesis使用者能够自动缩放,以便与原始实例共享负载(拆分处理分片)。
我能够做到的:使用定义良好的读取和我在这里可用的示例Kinesis binder docs我已经能够启动多个消费者,实际上通过提供这些属性来划分碎片以进行处理。
在生产者上,我通过应用程序属性提供partitionCount:2。在消费者身上,我提供了instanceIndex和instanceCount。
在消费者1上我有instanceIndex = 0和instantCount = 2,在消费者2上我有instanceIndex = 1和instantCount = 2
这工作正常,我有两个春季启动应用程序处理他们的特定分片。但在这种情况下,我必须为每个启动应用程序预先配置一个属性文件,在加载时需要它们才能分割负载。如果我只启动第一个消费者(非自动缩放),我只处理特定于索引0的分片,而不处理其他分片。
我想做但不确定是否可以部署一个消费者(处理所有分片)。如果我部署另一个实例,我希望该实例重温一些负载的第一个消费者,换句话说,如果我有2个分片和一个消费者它将处理两个,如果我然后部署另一个应用程序我想第一个消费者到目前为止,只从单个分片进行处理,将第二个分片留给第二个分片。
我试图通过不在消费者上指定instanceIndex或instanceCount并且仅提供组名来尝试这样做,但是在第一个消费者关闭之前使第二个消费者空闲。仅供参考我还创建了自己的元数据和锁定表,防止绑定器创建默认值。
配置:制作人-----------------
originator: KinesisProducer
server:
port: 8090
spring:
cloud:
stream:
bindings:
output:
destination: <stream-name>
content-type: application/json
producer:
headerMode: none
partitionKeyExpression: headers.type
消费者-------------------------------------
originator: KinesisSink
server:
port: 8091
spring:
cloud:
stream:
kinesis:
bindings:
input:
consumer:
listenerMode: batch
recordsLimit: 10
shardIteratorType: TRIM_HORIZON
binder:
checkpoint:
table: <checkpoint-table>
locks:
table: <locking-table
bindings:
input:
destination: <stream-name>
content-type: application/json
consumer:
concurrency: 1
listenerMode: batch
useNativeDecoding: true
recordsLimit: 10
idleBetweenPolls: 250
partitioned: true
group: mygroup
那是对的。这就是它现在的工作原理:如果有一个消费者,它需要所有的分片进行处理。第二个只有在第一个碎片以某种方式被破坏至少一个碎片时才会采取行动。
适合Kafka的重新平衡就在我们的路线图上。我们还没有坚实的愿景,所以欢迎关于此事和随后的贡献的问题!