apache-kafka 相关问题

Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。

在 Kafka Streams KGroupedTable.aggregate 中,加法器和减法器是否应该在每次调用时返回一个新的不可变聚合器?

我想将 KTable 条目聚合到集合映射(例如 Map>),因此在聚合函数中我需要提供加法器和减法器,如下所示: (键、值、映射)...

回答 1 投票 0

在聚合函数中访问窗口开始时间

我有一个 kafkaStream 拓扑,如下所示: 溪流 .filter(((key, Trade) -> Trade.tradeTime != null && Trade.tradeTime > TodayMillis )) .

回答 1 投票 0

如何访问 Kafka Streams 中 GlobalKTable 中的标头

我正在尝试在应用程序启动期间使用 GlobalKTable 存储来自 Kafka 主题的数据。 GlobalKTable 和 KStream 都使用相同的主题,我需要访问

回答 1 投票 0

当多条消息输出时,Kafka Streams 使用 Transformer 发送自定义标头

我想使用 Transformer (kafka Streams dsl) 向下游发送多条消息 私有 ProcessorContext 上下文; @覆盖 公共无效初始化(ProcessorContext上下文){ 这个。

回答 1 投票 0

Kafka Streams 窗口大小(15 分钟)大于最大轮询间隔(5 分钟)

我有一个 Kstreams 应用程序,我正在从输入主题中读取内容,在 15 分钟的窗口中执行聚合,抑制然后对每个记录执行一些操作,以下是...

回答 1 投票 0

Kafka 0.9.1授权

我正在探索Kafka 0.9.1的安全功能,但无法成功使用它。 我在我的 server.properties 中设置了以下配置 允许.everyone.if.no.acl.found = false 超级用户...

回答 1 投票 0

Apache Kafka Kraft - 已取消正在进行的 API_VERSIONS 请求

确认=-1 auto.include.jmx.reporter = true 批量大小 = 16384 bootstrap.servers = [本地主机:9092,本地主机:9094,本地主机:9096] 缓冲区内存= 33554432 客户端.dns.lookup =

回答 1 投票 0

在 Rust 中与 Kafka 交互时如何处理长时间运行的操作?

我正在编写一个 Rust 应用程序,它从 Kafka 读取一些数据并将其存档到磁盘。 将数据转换为正确格式,然后将数据写入磁盘的函数需要很长时间......

回答 1 投票 0

为什么 Debezium Mongo Source Kafka Connector 生成字符串 `after` 字段而不是 Json 对象?

这是我正在使用的配置 - { “名称”:“mongo-debezium-连接器”, “配置”:{ "connector.class": "io.debezium.connector.mongodb.

回答 1 投票 0

使用 Apache Kafka 提交上一条消息而不是最近读取的消息的方法?

在从 Kafka Consumer 消费数据时,有没有办法提交之前读取的消息而不是当前(最近读取的消息)? 解释一下我为什么要这样做。 我正在写...

回答 1 投票 0

来自单个消费者的 Spring Cloud Stream 多集群和多输入绑定

在我的项目中,我需要连接到两个不同的 Kafka 代理并消费两个主题的所有事件,每个 Kafka 代理上有一个主题。 我的 application.yaml 看起来有点像这样: 春天: 云...

回答 1 投票 0

Kafka DefaultErrorHandler 在异常后查找当前

在我使用kafka的spring boot消费者中,我已经配置了DefaultErrorHandler来重试失败的事件几次。 现在的问题是,每次重试后我都会看到异常的 stackTrace

回答 1 投票 0

Kafka 错误 org.apache.kafka.common.errors.NotLeaderOrFollowerException NOT_LEADER_OR_FOLLOWER 仅在两个 k8s 集群之一上

tl;博士 有两个 k8s 集群、一个包含 3 个 Broker 的 Kafka 集群以及一个包含 6 个分区的主题。 有一个服务,其相同版本在两个 k8s 集群上运行。其中之一,f...

回答 1 投票 0

docker如何将env变量映射到kafka代理变量?

我是 Docker 新手,并试图了解它是如何工作的 阅读这个 Kafka docker 图像 https://github.com/conduktor/kafka-stack-docker-compose/blob/master/zk-multiple-kafka-single.yml 在这里你...

回答 1 投票 0

Kafka和Spark的Structured Streaming结果应该如何根据特定列插入到Iceberg Table中?没有分区

我已经成功设置了 Spark 的 Session 并从 Kafka 的 Topic 流式传输消息。 kafka_stream_df = 火花 \ .readStream \ .format('卡夫卡') \ .option('kafka.bootstrap.ser...

回答 1 投票 0

PySpark:如何从具有可变消息类型的 Kafka 消息反序列化原始有效负载

我正在尝试读取包含具有不同 Proto 有效负载的消息的 Kafka 主题。在 Kafka 消息键中设置 messageName。 但是当我尝试: df = Spark.readStream.format(const...

回答 1 投票 0

Azure 事件中心/Confluence SDK

我正在使用现有应用程序 (C#) 中的 Confluence SDK,并转移到 Azure 事件中心 Kafka 端点。 我注意到构造函数参数中定义的消费者组,

回答 1 投票 0

Kafka 连接不断重新平衡

我正在部署一个kafka连接集群,由4个使用docker swarm的工作人员组成。在初始部署时存在一些情况(当环境中不存在其他 kafka 连接集群时......

回答 1 投票 0

Docker 端口映射:为什么 Kafka 不能使用除 9092 之外的本地机器端口?

Windows 10 专业版。 Docker v27.4.0。 我的 C# 应用程序与 Kafka 消息代理配合使用。 Kafka 在我本地机器上的 docker 容器中工作。 如果我在本地机器上使用 9092 端口进行端口映射(...

回答 1 投票 0

RabbitMQ 源连接器配置问题 - SSL 的 java.io.IOException 错误

我的 RabbitMQ 源连接器配置遇到问题。当我尝试设置连接器时,我收到 400 Bad Request 错误,并在连接期间出现多个 java.io.IOException 错误

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.