我想像这样实现反应式kafka消费者: https://projectreactor.io/docs/kafka/release/reference/[6.8。具有基于分区的排序的并发处理 ][1]
Scheduler scheduler = Schedulers.newElastic("sample", 60, true);
KafkaReceiver.create(receiverOptions)
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()));
我想追踪每一次民意调查。问题是,使用 groupBy 时,每个组只能分配一次 traceId。
我在没有kafka的情况下简化了代码:
Flux
.interval(Duration.ofMillis(100)) // kafka messages
.filter {
it % 2L == 0L // for simplicity, we will leave only one group
}
.groupBy {
it % 2L
}
// .windowTimeout(2, Duration.ofMillis(10))
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get<String>("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
输出为:
[0] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[2] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[4] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[6] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[8] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[10] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
相同的traceId1和相同的traceId2。
但是如果我们想象分组是用 windowTimeout 执行的 - 那么一切都会好起来的:
Flux
.interval(Duration.ofMillis(100)) // kafka messages
.filter {
it % 2L == 0L // for simplicity, we will leave only one group
}
/*
.groupBy {
it % 2L
}
*/
.windowTimeout(2, Duration.ofMillis(10))
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get<String>("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
我们得到以下结果:
[0] - d571b08e-d14c-425e-9062-49de555b6b6e - edef203f-4727-4b9b-bc27-82743e8c16f3
[2] - 8b1b2998-ff83-4b09-990c-93b3ad8e386b - e4b00092-e4b7-4f54-aa8a-90ac4882dea3
[4] - 323450bf-6d2c-4e39-8534-7d7ea63fbc31 - 1be0022c-88f4-4339-a00f-7e9f4f5ed45d
[6] - 8db7d902-5047-4772-9ebb-9b8bef983126 - ce92b804-ac72-4af1-b353-5c7576d71a01
[8] - 838e230c-fdb2-4e33-901e-96b7cbf2543e - f355382e-54ef-4a02-b2ed-a32bf855563c
[10] - d8bfc951-98a1-428a-b697-ebc447a78a40 - 916d2824-4e76-47fb-8573-653e954841eb
不同的traceId1和不同的traceId2。
如何使用 groupBy 来实现这一点,或者在这种情况下可能有 groupBy 的替代品?
我们可以使用collectMultimap代替groupBy:
Flux.interval(Duration.ofMillis(100)) // kafka messages
.windowTimeout(10, Duration.ofMillis(1000)) // kafka batch
.flatMap { polling ->
polling.collectMultimap {
it % 2
}
.flatMapMany { map ->
map.keys
.map {
Flux.fromIterable(map[it]!!)
}
.let {
Flux.fromIterable(it)
}
}
}
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get<String>("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
每个批量轮询都会有自己的traceId。
输出是:
[0, 2, 4, 6, 8] - 0e42ebe5-2f59-49b4-9ee2-1d42708ecc70 - dd007120-b544-4d8c-b9aa-059cda8a4d53
[1, 3, 5, 7, 9] - a7a9d1c0-9d3c-4cda-9e43-aac0c86940a1 - ab82ae64-e512-4476-b2f3-1ccf3b15803c
[11, 13, 15, 17, 19] - 8cc98954-4e48-44eb-803a-8b243e0c6244 - 8e515381-16c0-4cb4-8923-0703397971e9
[10, 12, 14, 16, 18] - e3707e39-5b92-4289-93ef-a76104bf036f - 384fe34e-fb6b-4d8a-8261-52f2294f618c
[20, 22, 24, 26, 28] - f82d0776-cf15-4850-9c9c-1d4374f0a36e - bfbca427-5177-4c21-9da9-7182ac6ef6ad
[21, 23, 25, 27, 29] - f364f43c-584b-461b-9a4c-8874aa7c6327 - f89a6ffc-db12-4462-ba05-1fd4482785d4
[30, 32, 34, 36, 38] - ef0b8b7d-9ae4-4d1c-8567-0441a514a6f1 - 27aaeb7b-4866-43da-8b3c-0640ed53bf66
[31, 33, 35, 37, 39] - 6a7218cb-3173-432d-9d48-fa8459b5d6c0 - ba3fa6d8-3491-4e38-b8b0-f2540432a3a6