与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我正在尝试调试我们的生产 Kafka Streams 应用程序中的问题。 (简化的)拓扑看起来像这样 builder.stream("输入").groupByKey().reduce( (agg, val) -> &quo...
有两个kafka主题 消息 图片 新闻主题中的消息可以包含图像 ID 列表,如下所示 { “id”:“新闻-1”, "title": "标题新闻-1", ...
我在 Kafka 流应用程序中编写了以下代码: KGroupedStream groupedStream = Stream.groupByKey(); groupedStream.windowedBy( SessionWindows.with(Duration.ofSeconds(3))....
具有基于线程并行性的 Kafka Streams 与用于并行处理的 Kafka Parallel Consumers
我们正在为实时协作形式构建事件驱动的架构。我们的解决方案使用 Kafka 作为事件代理,其中事件排序和有状态流处理是关键要求。 ...
我正在尝试找到某种方法来从手动偏移恢复 Kafka Streams。 通过在互联网上查找,我没有找到任何说可以的答案。 有什么办法吗?或者必须回到低水平
Kafka Streams - 为什么我不能聚合和总结我的多头?
我是 Kafka Streams 的新手,我正在尝试拼凑我的第一个应用程序。 我想将我的银行交易金额加起来。 @豆 公共 KStream kStream(StreamsBuilder
考虑这个 KStream: KStream inputStream = StreamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())); 物化 考虑这个 KStream: KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())); Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE); KStream<Windowed<String>, List<String>> outputStream = inputStream .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2))) .aggregate( ArrayList::new, (key, string, aggregate) -> { aggregate.add(string); return aggregate; }, with) .toStream(); outputStream 将聚合来自 inputStream info 的所有消息,定义时间范围内的消息数组。 另外,现在我想将消息聚合到特定限制,例如直到列表大小不超过 50 条。 如果列表在聚合过程中变得大于 50,我想以某种方式将其拆分为附加列表。 基本上,我希望实现的输出是获取一组消息,其大小达到限制(例如 50 条),并且达到特定的时间范围,以先到者为准。 为了实现这一目标,我在这里缺少什么? 您可以尝试将 KTable 转换为 KStream 并执行 flatMapValues 来拆分列表,如下所示(Kotlin 中的代码): val s = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())) val output = s .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2))) .aggregate({ mutableListOf() }, { k: String, str: String, agg: List<String> -> agg.plus(str) }) .toStream() .flatMapValues { strList -> strList.chunked(50) } 但这意味着您将整个聚合列表加载到内存中 - 可能会也可能不会是一个问题,具体取决于列表大小和您的内存设置,但这绝对是需要记住的事情。
Kafka Streams 用于聚合事件处理和对大量任务的可扩展性
我有以下任务:假设我正在开发一个由一百万用户使用的在线商店。用户在不同的时间进行购买。每个用户的每次购买都会变成一次交易事件。我...
Kafka Streams 和 CompletableFuture(或异步 java api)
我正在研究 Apache Kafka Stream SPI。我想知道是否有一种方法可以在 mapValues 方法内部执行异步代码。例如从外部存储检索数据。有没有办法整合...
我想提取每条消息中嵌入的时间戳并将它们作为 json 有效负载发送到我的数据库中。 我想获得以下三个时间戳。 事件时间:事件发生的时间点...
Kafka Streams GlobalKTable 并访问记录头
以同样的方式,KStream 和 KTable#toStream() 允许调用过程或转换,从而能够检查记录头,是否有一种方法可以通过 GlobalKTable 实现相同的目的。基本上,我是
我正在使用 Kafka 来处理一个有 4 个分区的主题。 Kafka 中消息的保留期 (TTL) 默认设置为 7 天。我正在运行一个非流式批处理作业,处理来自...的数据
消费Kafka主题时,Kafka会将所有消息按顺序传递给消费者。现在假设主题中的数据是实时数据,其中只有每个键的最新消息才重要,即所有
如何在 Kafka Streams Spring Boot 应用程序中启用 kafka 指标
Prometheus 端点正在工作,但没有获得 kafka 指标 我添加了 micrometer-prometheus 依赖项和 spring.jmx.enabled: true 并且需要管理配置
对包含逻辑删除值的 ktable 执行 groupby 会发生什么? 好像groupby没有被评估,但是tombstone会像filter方法d一样被转发吗...
我正在使用 Kafka 将数据保存到rocksdb 中。 现在我想看看 Kafka 创建的数据库键和值。 我下载了 FastNoSQL 并尝试但失败了。 该文件夹包含...
为什么 Apache Kafka Streams 使用 RocksDB 以及如何更改它?
在调查 Apache Kafka 0.9 和 0.10 的新功能时, 我们使用了 KStreams 和 KTables。有一个有趣的事实是,Kafka 内部使用的是 RocksDB。 请参阅卡夫卡街简介...
我使用Kafka Streams来处理用户数据的变化以及与用户操作相对应的事件。 我使用连接操作 (KStream-KTable) 丰富事件,然后将丰富的事件写入 Elastic...
Kafka Stream Transform - 按需获取数据并缓存,懒惰
当我们要求的密钥在其中找不到时,Kafka Streams 构建表/流来延迟获取数据的最佳方法是什么(如果有)? 假设有一个用户活动流 A...