apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

RocksDb sst 文件的 GUI 查看器

我正在使用 Kafka 将数据保存到rocksdb 中。 现在我想看看 Kafka 创建的数据库键和值。 我下载了 FastNoSQL 并尝试但失败了。 该文件夹包含...

回答 2 投票 0

为什么 Apache Kafka Streams 使用 RocksDB 以及如何更改它?

在调查 Apache Kafka 0.9 和 0.10 的新功能时, 我们使用了 KStreams 和 KTables。有一个有趣的事实是,Kafka 内部使用的是 RocksDB。 请参阅卡夫卡街简介...

回答 2 投票 0

如何更新CQRS中读取模型中现有的丰富事件?

我使用Kafka Streams来处理用户数据的变化以及与用户操作相对应的事件。 我使用连接操作 (KStream-KTable) 丰富事件,然后将丰富的事件写入 Elastic...

回答 1 投票 0

Kafka Stream Transform - 按需获取数据并缓存,懒惰

当我们要求的密钥在其中找不到时,Kafka Streams 构建表/流来延迟获取数据的最佳方法是什么(如果有)? 假设有一个用户活动流 A...

回答 1 投票 0


Kafka Streams 管理客户端断开连接

我们正在使用 Kafka Streams 进行一些流处理,我们在可观测系统中看到一些奇怪的日志。 一切正常,处理工作没有任何问题,但是...

回答 1 投票 0

跳过在 kafka 流拓扑上抛出运行时错误的消息

我以通常的方式创建了一个 Kafka Stream 拓扑。 公共拓扑 buildTopology() { StreamsBuilder 构建器 = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(...)) ...

回答 1 投票 0

未知的魔法字节-Kafka Json

我一直在努力解决一些与kafka流一起工作以及我的消费者如何期望数据的问题。我的管道的工作原理如下: 我配置了 couchbase 的源连接器,添加了 ...

回答 1 投票 0

当 kafka 节点从集群中关闭时,面临连接此 kafka 集群的 kafka 流问题

我使用3个节点的kafka集群,并且有多个kafka流和生产者连接。 当其中一个节点从此 kafka 集群关闭时,我遇到问题。 我面临的问题是

回答 1 投票 0

为 Kafka Streams 内部变更日志主题提供稳定的名称

我们的团队正在使用 Kafka Streams 版本 3.6.0。我们正在尝试在一些外部管理的代理上运行 Kafka Streams 拓扑。当 Kafka Streams 应用程序时,我收到以下错误...

回答 1 投票 0

Kafka 流使用标头过滤消息

我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。 例如,...

回答 1 投票 0

无法读取某个主题所有分区的所有 Kafka 消息

我正在使用一个 Spring Boot Java 项目,其中我提到了一个消费者组,并且刚刚创建了一个消费者实例。这意味着如果我从这个消费者那里轮询数据,它应该默认重新...

回答 1 投票 0

Memgraph Stream 无法解决冲突的事务

我正在使用本地托管的 Memgraph 实例,并通过 Kafka Connect 和 Kafka 从 postgres 流式传输 CDC 数据。最近一直遇到交易错误,一直没找到so...

回答 1 投票 0

具有逐步限制键的 KafkaStream 聚合

我正在努力解决 kafkastream 聚合背后的逻辑。 我有 A_B_C 形式的字符串键记录和基本上是 int 值的值,我想逐步聚合...

回答 1 投票 0

PySpark 结构化流作业的正常关闭会引发 Py4JNetworkError

我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但我遇到了......

回答 1 投票 0

无法将数据从 Kafka 写入 Databricks 中的 Delta Live 表

我有这个代码: @dlt.表( 名称=“kafka_bronze”, table_properties={"pipelines.autoOptimize.enabled": "true"} ) def kafka_bronze(): df = (火花。

回答 1 投票 0

Kafka 生产者将记录发送到主题分区捕获 TimeoutException:过期 1 条记录

在实际环境中,使用flink连接kafka作为Kafka的生产者。现在,在环境:org.apache.k 中发送数据时出现“TimeoutException”异常信息...

回答 1 投票 0

如何。读取或重试 kafka 中未提交消息的流程

假设我收到 1 到 15 条来自 kafka 的消息。我正在处理它并提交它,如果消息确实满足条件,我将取消提交特定消息。 现在,我的问题是如何...

回答 1 投票 0

多个 Spark 结构化流作业使用相同 Kafka 主题的问题

我有两个独立的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流处理来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组...

回答 1 投票 0

Kafka 流正确处理消息,但抛出反序列化异常

我第一次尝试卡夫卡流,我得到了一种我不太理解的行为。我有一个制作人以以下格式发送有关输入主题的消息: {“fooId”:“...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.