与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我正在使用 Kafka 将数据保存到rocksdb 中。 现在我想看看 Kafka 创建的数据库键和值。 我下载了 FastNoSQL 并尝试但失败了。 该文件夹包含...
为什么 Apache Kafka Streams 使用 RocksDB 以及如何更改它?
在调查 Apache Kafka 0.9 和 0.10 的新功能时, 我们使用了 KStreams 和 KTables。有一个有趣的事实是,Kafka 内部使用的是 RocksDB。 请参阅卡夫卡街简介...
我使用Kafka Streams来处理用户数据的变化以及与用户操作相对应的事件。 我使用连接操作 (KStream-KTable) 丰富事件,然后将丰富的事件写入 Elastic...
Kafka Stream Transform - 按需获取数据并缓存,懒惰
当我们要求的密钥在其中找不到时,Kafka Streams 构建表/流来延迟获取数据的最佳方法是什么(如果有)? 假设有一个用户活动流 A...
我希望每当我调用此函数时再次消耗特定偏移量和分区的旧kafka事件。我应该使用 Kafka Stream 还是 kafkaListner
当旧事件数量较少或较多时,这是首选。我正在使用弹簧靴
我们正在使用 Kafka Streams 进行一些流处理,我们在可观测系统中看到一些奇怪的日志。 一切正常,处理工作没有任何问题,但是...
我以通常的方式创建了一个 Kafka Stream 拓扑。 公共拓扑 buildTopology() { StreamsBuilder 构建器 = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(...)) ...
我一直在努力解决一些与kafka流一起工作以及我的消费者如何期望数据的问题。我的管道的工作原理如下: 我配置了 couchbase 的源连接器,添加了 ...
当 kafka 节点从集群中关闭时,面临连接此 kafka 集群的 kafka 流问题
我使用3个节点的kafka集群,并且有多个kafka流和生产者连接。 当其中一个节点从此 kafka 集群关闭时,我遇到问题。 我面临的问题是
为 Kafka Streams 内部变更日志主题提供稳定的名称
我们的团队正在使用 Kafka Streams 版本 3.6.0。我们正在尝试在一些外部管理的代理上运行 Kafka Streams 拓扑。当 Kafka Streams 应用程序时,我收到以下错误...
我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。 例如,...
我正在使用一个 Spring Boot Java 项目,其中我提到了一个消费者组,并且刚刚创建了一个消费者实例。这意味着如果我从这个消费者那里轮询数据,它应该默认重新...
我正在使用本地托管的 Memgraph 实例,并通过 Kafka Connect 和 Kafka 从 postgres 流式传输 CDC 数据。最近一直遇到交易错误,一直没找到so...
我正在努力解决 kafkastream 聚合背后的逻辑。 我有 A_B_C 形式的字符串键记录和基本上是 int 值的值,我想逐步聚合...
PySpark 结构化流作业的正常关闭会引发 Py4JNetworkError
我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但我遇到了......
无法将数据从 Kafka 写入 Databricks 中的 Delta Live 表
我有这个代码: @dlt.表( 名称=“kafka_bronze”, table_properties={"pipelines.autoOptimize.enabled": "true"} ) def kafka_bronze(): df = (火花。
Kafka 生产者将记录发送到主题分区捕获 TimeoutException:过期 1 条记录
在实际环境中,使用flink连接kafka作为Kafka的生产者。现在,在环境:org.apache.k 中发送数据时出现“TimeoutException”异常信息...
假设我收到 1 到 15 条来自 kafka 的消息。我正在处理它并提交它,如果消息确实满足条件,我将取消提交特定消息。 现在,我的问题是如何...
多个 Spark 结构化流作业使用相同 Kafka 主题的问题
我有两个独立的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流处理来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组...
我第一次尝试卡夫卡流,我得到了一种我不太理解的行为。我有一个制作人以以下格式发送有关输入主题的消息: {“fooId”:“...