与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我希望每当我调用此函数时再次消耗特定偏移量和分区的旧kafka事件。我应该使用 Kafka Stream 还是 kafkaListner
当旧事件数量较少或较多时,这是首选。我正在使用弹簧靴
我们正在使用 Kafka Streams 进行一些流处理,我们在可观测系统中看到一些奇怪的日志。 一切正常,处理工作没有任何问题,但是...
我以通常的方式创建了一个 Kafka Stream 拓扑。 公共拓扑 buildTopology() { StreamsBuilder 构建器 = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(...)) ...
我一直在努力解决一些与kafka流一起工作以及我的消费者如何期望数据的问题。我的管道的工作原理如下: 我配置了 couchbase 的源连接器,添加了 ...
当 kafka 节点从集群中关闭时,面临连接此 kafka 集群的 kafka 流问题
我使用3个节点的kafka集群,并且有多个kafka流和生产者连接。 当其中一个节点从此 kafka 集群关闭时,我遇到问题。 我面临的问题是
为 Kafka Streams 内部变更日志主题提供稳定的名称
我们的团队正在使用 Kafka Streams 版本 3.6.0。我们正在尝试在一些外部管理的代理上运行 Kafka Streams 拓扑。当 Kafka Streams 应用程序时,我收到以下错误...
我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。 例如,...
我正在使用一个 Spring Boot Java 项目,其中我提到了一个消费者组,并且刚刚创建了一个消费者实例。这意味着如果我从这个消费者那里轮询数据,它应该默认重新...
我正在使用本地托管的 Memgraph 实例,并通过 Kafka Connect 和 Kafka 从 postgres 流式传输 CDC 数据。最近一直遇到交易错误,一直没找到so...
我正在努力解决 kafkastream 聚合背后的逻辑。 我有 A_B_C 形式的字符串键记录和基本上是 int 值的值,我想逐步聚合...
PySpark 结构化流作业的正常关闭会引发 Py4JNetworkError
我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但我遇到了......
无法将数据从 Kafka 写入 Databricks 中的 Delta Live 表
我有这个代码: @dlt.表( 名称=“kafka_bronze”, table_properties={"pipelines.autoOptimize.enabled": "true"} ) def kafka_bronze(): df = (火花。
Kafka 生产者将记录发送到主题分区捕获 TimeoutException:过期 1 条记录
在实际环境中,使用flink连接kafka作为Kafka的生产者。现在,在环境:org.apache.k 中发送数据时出现“TimeoutException”异常信息...
假设我收到 1 到 15 条来自 kafka 的消息。我正在处理它并提交它,如果消息确实满足条件,我将取消提交特定消息。 现在,我的问题是如何...
多个 Spark 结构化流作业使用相同 Kafka 主题的问题
我有两个独立的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流处理来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组...
我第一次尝试卡夫卡流,我得到了一种我不太理解的行为。我有一个制作人以以下格式发送有关输入主题的消息: {“fooId”:“...
注册 Avro schema 时出错:写入 Kafka 存储时注册 schema 操作失败;错误代码:50001
我在发送从 .avsc 文件生成的对象时遇到了一个奇怪的问题。 在我的本地上,没有问题,但在开发服务器上,它会抛出异常 我使用生产者发送这种类型的回复...
带有更改日志主题的 Kafka Streams KTable 存储与日志压缩源主题
我正在一个输入主题上构建 KTable,并在两个 Kafka Stream 应用程序实例上加入 KStream。 KTable 的输入主题已经是日志压缩主题。所以当我的一个应用程序...
从 PySpark 结构化流将聚合数据写入 MongoDB 的问题
我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置: 我有一个 Kafka 主题,我正在其中使用 JSON 消息。 我是...
如何在运行时重新配置 Kafka Streams 应用程序中的过滤器(无需停止并重新启动应用程序)? 过滤内容的配置最好在 Kafka 配置主题中找到。 ...