这是一个关于流的二合一问题。
我正在开发一项由两个流组成的服务。一个(第一个)应该消耗整个主题,接收键/值对并将其信息存储在本地 HashMap 中
一旦该流不再有延迟,第二个流就会启动并消耗另一个主题。根据使用的数据及其属性之一,根据 HashMap 条目决定是删除记录还是进一步处理记录。 因此,第一个流需要在第二个流开始之前到达其主题的末尾。 我想保持服务无状态,因此不将数据保存在状态存储中。这会产生两个问题:
enable.auto.commit = false
auto.offset.reset = earliest
但是对于流来说这不起作用。我的临时解决方案是生成一个带有随机部分的 ApplicationID,以便忽略先前写入的偏移量。 这会为每个实例生成一个新的消费者组,从而在代理上产生许多组。
-> 有没有办法将流客户端配置为不写入偏移量?
try {
firstStreams.start();
// Waiting for consumer to start...
while (computeLag(firstStreams.metrics().entrySet()) < 1) {
sleep(100);
}
// Waiting for Lag to reach 0
while (computeLag(firstStreams.metrics().entrySet()) > 1) {
sleep(100);
}
secondStream.start();
shutdownLatch.await();
} catch (Throwable e) {
System.exit(1);
}
}
System.exit(0);
}
private static double computeLag(Set<? extends Map.Entry<MetricName, ? extends Metric>> metrics) {
return metrics.stream()
.filter(entries -> entries.getKey().name().equals("records-lag"))
.map(entry -> entry.getValue().metricValue().toString())
.map(Double::parseDouble)
.mapToDouble(Double::doubleValue)
.sum();
}
只要消费者未运行,消费者指标就会返回 0.0。成功启动后,它会返回所有分区的延迟。
这个版本可以工作,但似乎错误且复杂。
-> 有没有正确的方法来等待流到达“末尾”?
提前致谢并致以诚挚的问候
将流客户端配置为不写入偏移量?
据我所知,没有 Kafka Streams。至少,没有状态处理
有没有正确的方法来等待流到达“结束”?
没有。您所做的似乎不错,但我认为它无法扩展到一个分区/实例之外。
接收键/值对并将其信息存储在本地 HashMap 中
这并不是 Kafka Steams 真正可接受的用例。您应该使用 KTable 和交互式查询进行查找,或连接来过滤其他流