apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。


Kafka Streams 管理客户端断开连接

我们正在使用 Kafka Streams 进行一些流处理,我们在可观测系统中看到一些奇怪的日志。 一切正常,处理工作没有任何问题,但是...

回答 1 投票 0

跳过在 kafka 流拓扑上抛出运行时错误的消息

我以通常的方式创建了一个 Kafka Stream 拓扑。 公共拓扑 buildTopology() { StreamsBuilder 构建器 = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(...)) ...

回答 1 投票 0

未知的魔法字节-Kafka Json

我一直在努力解决一些与kafka流一起工作以及我的消费者如何期望数据的问题。我的管道的工作原理如下: 我配置了 couchbase 的源连接器,添加了 ...

回答 1 投票 0

当 kafka 节点从集群中关闭时,面临连接此 kafka 集群的 kafka 流问题

我使用3个节点的kafka集群,并且有多个kafka流和生产者连接。 当其中一个节点从此 kafka 集群关闭时,我遇到问题。 我面临的问题是

回答 1 投票 0

为 Kafka Streams 内部变更日志主题提供稳定的名称

我们的团队正在使用 Kafka Streams 版本 3.6.0。我们正在尝试在一些外部管理的代理上运行 Kafka Streams 拓扑。当 Kafka Streams 应用程序时,我收到以下错误...

回答 1 投票 0

Kafka 流使用标头过滤消息

我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。 例如,...

回答 1 投票 0

无法读取某个主题所有分区的所有 Kafka 消息

我正在使用一个 Spring Boot Java 项目,其中我提到了一个消费者组,并且刚刚创建了一个消费者实例。这意味着如果我从这个消费者那里轮询数据,它应该默认重新...

回答 1 投票 0

Memgraph Stream 无法解决冲突的事务

我正在使用本地托管的 Memgraph 实例,并通过 Kafka Connect 和 Kafka 从 postgres 流式传输 CDC 数据。最近一直遇到交易错误,一直没找到so...

回答 1 投票 0

具有逐步限制键的 KafkaStream 聚合

我正在努力解决 kafkastream 聚合背后的逻辑。 我有 A_B_C 形式的字符串键记录和基本上是 int 值的值,我想逐步聚合...

回答 1 投票 0

PySpark 结构化流作业的正常关闭会引发 Py4JNetworkError

我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但我遇到了......

回答 1 投票 0

无法将数据从 Kafka 写入 Databricks 中的 Delta Live 表

我有这个代码: @dlt.表( 名称=“kafka_bronze”, table_properties={"pipelines.autoOptimize.enabled": "true"} ) def kafka_bronze(): df = (火花。

回答 1 投票 0

Kafka 生产者将记录发送到主题分区捕获 TimeoutException:过期 1 条记录

在实际环境中,使用flink连接kafka作为Kafka的生产者。现在,在环境:org.apache.k 中发送数据时出现“TimeoutException”异常信息...

回答 1 投票 0

如何。读取或重试 kafka 中未提交消息的流程

假设我收到 1 到 15 条来自 kafka 的消息。我正在处理它并提交它,如果消息确实满足条件,我将取消提交特定消息。 现在,我的问题是如何...

回答 1 投票 0

多个 Spark 结构化流作业使用相同 Kafka 主题的问题

我有两个独立的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流处理来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组...

回答 1 投票 0

Kafka 流正确处理消息,但抛出反序列化异常

我第一次尝试卡夫卡流,我得到了一种我不太理解的行为。我有一个制作人以以下格式发送有关输入主题的消息: {“fooId”:“...

回答 1 投票 0

注册 Avro schema 时出错:写入 Kafka 存储时注册 schema 操作失败;错误代码:50001

我在发送从 .avsc 文件生成的对象时遇到了一个奇怪的问题。 在我的本地上,没有问题,但在开发服务器上,它会抛出异常 我使用生产者发送这种类型的回复...

回答 1 投票 0

带有更改日志主题的 Kafka Streams KTable 存储与日志压缩源主题

我正在一个输入主题上构建 KTable,并在两个 Kafka Stream 应用程序实例上加入 KStream。 KTable 的输入主题已经是日志压缩主题。所以当我的一个应用程序...

回答 1 投票 0

从 PySpark 结构化流将聚合数据写入 MongoDB 的问题

我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置: 我有一个 Kafka 主题,我正在其中使用 JSON 消息。 我是...

回答 1 投票 0

运行时重新配置 Kafka Streams 过滤器

如何在运行时重新配置 Kafka Streams 应用程序中的过滤器(无需停止并重新启动应用程序)? 过滤内容的配置最好在 Kafka 配置主题中找到。 ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.