apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

查看在流应用程序的输出中创建的重复记录

我有一个 Kafka Streams 应用程序,它从 Kafka 主题获取输入,在 5 分钟窗口内将其聚合到原始值的三个字段上。 在输出方面,我需要翻译 aggre...

回答 1 投票 0

Kafka Streams 从 JSONObject 创建空模式

我正在编写一个 Kafka Streams 应用程序,该应用程序将从非关系自定义 Kafka 源连接器接收的数据进行转换,并将其拆分为多个主题以对其进行规范化(以便可以使用...

回答 1 投票 0

Kafka Streams 状态存储与 MongoDB 的状态管理

我正在开发一个使用 Kafka Streams 进行组件之间通信的分布式系统。 其中一个组件(为了简单起见,BRAIN)管理发送给其他组件的一系列消息(A,...

回答 1 投票 0

如何避免使用KafaConsumer API消费kafka消息的延迟

我需要从 Kafka 主题快速检索 1000 条消息,但初始检索很慢 kafka-clients 3.6.1 KafkaConsumer API。 我们正在从旧的 Kafka 客户端(版本 0.8.1)迁移到...

回答 1 投票 0

应用程序仅针对 1 个输入生成 2 条消息

我正在尝试调试我们的生产 Kafka Streams 应用程序中的问题。 (简化的)拓扑看起来像这样 builder.stream("输入").groupByKey().reduce( (agg, val) -> &quo...

回答 1 投票 0

Kafka 流提交偏移语义

我只是想确认一些我认为在文档行之间的内容。说 kafka 流中的提交独立于偏移量/消息是否已正确,是否正确?

回答 2 投票 0

Kafka 流外键与一对多关系连接

有两个kafka主题 消息 图片 新闻主题中的消息可以包含图像 ID 列表,如下所示 { “id”:“新闻-1”, "title": "标题新闻-1", ...

回答 1 投票 0

Kafka Stream 抑制会话窗口聚合

我在 Kafka 流应用程序中编写了以下代码: KGroupedStream groupedStream = Stream.groupByKey(); groupedStream.windowedBy( SessionWindows.with(Duration.ofSeconds(3))....

回答 3 投票 0

具有基于线程并行性的 Kafka Streams 与用于并行处理的 Kafka Parallel Consumers

我们正在为实时协作形式构建事件驱动的架构。我们的解决方案使用 Kafka 作为事件代理,其中事件排序和有状态流处理是关键要求。 ...

回答 1 投票 0

从自定义偏移量恢复 Kafka Stream

我正在尝试找到某种方法来从手动偏移恢复 Kafka Streams。 通过在互联网上查找,我没有找到任何说可以的答案。 有什么办法吗?或者必须回到低水平

回答 1 投票 0

Kafka Streams - 为什么我不能聚合和总结我的多头?

我是 Kafka Streams 的新手,我正在尝试拼凑我的第一个应用程序。 我想将我的银行交易金额加起来。 @豆 公共 KStream kStream(StreamsBuilder

回答 1 投票 0

如何在特定时间范围内聚合KStream到固定大小的列表?

考虑这个 KStream: KStream inputStream = StreamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())); 物化 考虑这个 KStream: KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())); Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE); KStream<Windowed<String>, List<String>> outputStream = inputStream .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2))) .aggregate( ArrayList::new, (key, string, aggregate) -> { aggregate.add(string); return aggregate; }, with) .toStream(); outputStream 将聚合来自 inputStream info 的所有消息,定义时间范围内的消息数组。 另外,现在我想将消息聚合到特定限制,例如直到列表大小不超过 50 条。 如果列表在聚合过程中变得大于 50,我想以某种方式将其拆分为附加列表。 基本上,我希望实现的输出是获取一组消息,其大小达到限制(例如 50 条),并且达到特定的时间范围,以先到者为准。 为了实现这一目标,我在这里缺少什么? 您可以尝试将 KTable 转换为 KStream 并执行 flatMapValues 来拆分列表,如下所示(Kotlin 中的代码): val s = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())) val output = s .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2))) .aggregate({ mutableListOf() }, { k: String, str: String, agg: List<String> -> agg.plus(str) }) .toStream() .flatMapValues { strList -> strList.chunked(50) } 但这意味着您将整个聚合列表加载到内存中 - 可能会也可能不会是一个问题,具体取决于列表大小和您的内存设置,但这绝对是需要记住的事情。

回答 1 投票 0

Kafka Streams 用于聚合事件处理和对大量任务的可扩展性

我有以下任务:假设我正在开发一个由一百万用户使用的在线商店。用户在不同的时间进行购买。每个用户的每次购买都会变成一次交易事件。我...

回答 1 投票 0

Kafka Streams 和 CompletableFuture(或异步 java api)

我正在研究 Apache Kafka Stream SPI。我想知道是否有一种方法可以在 mapValues 方法内部执行异步代码。例如从外部存储检索数据。有没有办法整合...

回答 1 投票 0

如何提取 Kafka Streams 中消息中嵌入的时间戳

我想提取每条消息中嵌入的时间戳并将它们作为 json 有效负载发送到我的数据库中。 我想获得以下三个时间戳。 事件时间:事件发生的时间点...

回答 1 投票 0

Kafka Streams GlobalKTable 并访问记录头

以同样的方式,KStream 和 KTable#toStream() 允许调用过程或转换,从而能够检查记录头,是否有一种方法可以通过 GlobalKTable 实现相同的目的。基本上,我是

回答 2 投票 0

保留策略删除旧消息后,Kafka 会继续分配顺序偏移吗?

我正在使用 Kafka 来处理一个有 4 个分区的主题。 Kafka 中消息的保留期 (TTL) 默认设置为 7 天。我正在运行一个非流式批处理作业,处理来自...的数据

回答 1 投票 0

实时处理中始终仅处理最新消息(每个键)

消费Kafka主题时,Kafka会将所有消息按顺序传递给消费者。现在假设主题中的数据是实时数据,其中只有每个键的最新消息才重要,即所有

回答 2 投票 0

如何在 Kafka Streams Spring Boot 应用程序中启用 kafka 指标

Prometheus 端点正在工作,但没有获得 kafka 指标 我添加了 micrometer-prometheus 依赖项和 spring.jmx.enabled: true 并且需要管理配置

回答 0 投票 0

对墓碑值执行分组

对包含逻辑删除值的 ktable 执行 groupby 会发生什么? 好像groupby没有被评估,但是tombstone会像filter方法d一样被转发吗...

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.