Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。
在 Kafka Streams KGroupedTable.aggregate 中,加法器和减法器是否应该在每次调用时返回一个新的不可变聚合器?
我想将 KTable 条目聚合到集合映射(例如 Map>),因此在聚合函数中我需要提供加法器和减法器,如下所示: (键、值、映射)...
我有一个 kafkaStream 拓扑,如下所示: 溪流 .filter(((key, Trade) -> Trade.tradeTime != null && Trade.tradeTime > TodayMillis )) .
如何访问 Kafka Streams 中 GlobalKTable 中的标头
我正在尝试在应用程序启动期间使用 GlobalKTable 存储来自 Kafka 主题的数据。 GlobalKTable 和 KStream 都使用相同的主题,我需要访问
当多条消息输出时,Kafka Streams 使用 Transformer 发送自定义标头
我想使用 Transformer (kafka Streams dsl) 向下游发送多条消息 私有 ProcessorContext 上下文; @覆盖 公共无效初始化(ProcessorContext上下文){ 这个。
Kafka Streams 窗口大小(15 分钟)大于最大轮询间隔(5 分钟)
我有一个 Kstreams 应用程序,我正在从输入主题中读取内容,在 15 分钟的窗口中执行聚合,抑制然后对每个记录执行一些操作,以下是...
我正在探索Kafka 0.9.1的安全功能,但无法成功使用它。 我在我的 server.properties 中设置了以下配置 允许.everyone.if.no.acl.found = false 超级用户...
Apache Kafka Kraft - 已取消正在进行的 API_VERSIONS 请求
确认=-1 auto.include.jmx.reporter = true 批量大小 = 16384 bootstrap.servers = [本地主机:9092,本地主机:9094,本地主机:9096] 缓冲区内存= 33554432 客户端.dns.lookup =
在 Rust 中与 Kafka 交互时如何处理长时间运行的操作?
我正在编写一个 Rust 应用程序,它从 Kafka 读取一些数据并将其存档到磁盘。 将数据转换为正确格式,然后将数据写入磁盘的函数需要很长时间......
为什么 Debezium Mongo Source Kafka Connector 生成字符串 `after` 字段而不是 Json 对象?
这是我正在使用的配置 - { “名称”:“mongo-debezium-连接器”, “配置”:{ "connector.class": "io.debezium.connector.mongodb.
使用 Apache Kafka 提交上一条消息而不是最近读取的消息的方法?
在从 Kafka Consumer 消费数据时,有没有办法提交之前读取的消息而不是当前(最近读取的消息)? 解释一下我为什么要这样做。 我正在写...
来自单个消费者的 Spring Cloud Stream 多集群和多输入绑定
在我的项目中,我需要连接到两个不同的 Kafka 代理并消费两个主题的所有事件,每个 Kafka 代理上有一个主题。 我的 application.yaml 看起来有点像这样: 春天: 云...
Kafka DefaultErrorHandler 在异常后查找当前
在我使用kafka的spring boot消费者中,我已经配置了DefaultErrorHandler来重试失败的事件几次。 现在的问题是,每次重试后我都会看到异常的 stackTrace
tl;博士 有两个 k8s 集群、一个包含 3 个 Broker 的 Kafka 集群以及一个包含 6 个分区的主题。 有一个服务,其相同版本在两个 k8s 集群上运行。其中之一,f...
我是 Docker 新手,并试图了解它是如何工作的 阅读这个 Kafka docker 图像 https://github.com/conduktor/kafka-stack-docker-compose/blob/master/zk-multiple-kafka-single.yml 在这里你...
Kafka和Spark的Structured Streaming结果应该如何根据特定列插入到Iceberg Table中?没有分区
我已经成功设置了 Spark 的 Session 并从 Kafka 的 Topic 流式传输消息。 kafka_stream_df = 火花 \ .readStream \ .format('卡夫卡') \ .option('kafka.bootstrap.ser...
PySpark:如何从具有可变消息类型的 Kafka 消息反序列化原始有效负载
我正在尝试读取包含具有不同 Proto 有效负载的消息的 Kafka 主题。在 Kafka 消息键中设置 messageName。 但是当我尝试: df = Spark.readStream.format(const...
我正在使用现有应用程序 (C#) 中的 Confluence SDK,并转移到 Azure 事件中心 Kafka 端点。 我注意到构造函数参数中定义的消费者组,
我正在部署一个kafka连接集群,由4个使用docker swarm的工作人员组成。在初始部署时存在一些情况(当环境中不存在其他 kafka 连接集群时......
Docker 端口映射:为什么 Kafka 不能使用除 9092 之外的本地机器端口?
Windows 10 专业版。 Docker v27.4.0。 我的 C# 应用程序与 Kafka 消息代理配合使用。 Kafka 在我本地机器上的 docker 容器中工作。 如果我在本地机器上使用 9092 端口进行端口映射(...
RabbitMQ 源连接器配置问题 - SSL 的 java.io.IOException 错误
我的 RabbitMQ 源连接器配置遇到问题。当我尝试设置连接器时,我收到 400 Bad Request 错误,并在连接期间出现多个 java.io.IOException 错误