为什么这个查询没有输出? |我想过滤掉那些从未获得最高或最低分数的('student_id',student_name')
查询1: 与 cte AS ( 选择 *, MAX(分数) OVER (PARTITION BY exam_id) AS max_score, MIN(分数) OVER (PARTITION BY exam_id) AS min_score 从 学生加入...
我正在尝试使用“camel-azure-storage-datalake-kafka-connector”从 Kafka 连接到 Azure ADLS Gen2 我有一个运行 Docker 的 Linux 机器,其中包含 debezium/zookeeper、debezium/kafka 和 debe...
我是 Kafka 和设置 Kafka UI 的新手,尽管我在最后的 docker 和 docker-compose 上设置了先决条件。 我最后有一个 3 节点 Kafka 集群设置。 下面是 docker-compose...
如何在 Kubernetes multipod 部署中使用 spring kafka 处理 Kafka 容器生命周期
我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry 端点注册表。
Kafka Java Consumer Client 是单线程的吗
我们正在开始使用 Kafka, 在阅读本文时 - https://docs.confluence.io/kafka-clients/java/current/overview.html - 它似乎暗示客户端是单线程的。 * 由于这个...
如何使用 kafka msg key 作为 s3 连接器中的分区标准或 我怎样才能获得密钥并将其存储在 s3 对象中 谢谢!
尝试使用 PARTITION BY 时出现 SQlite SELECT OVER(...) 语法错误
我在 SQLite 表中有遥测记录,其中包含三个字段:record_name、record_time、record_value。 我想从表中为每个 record_name 选择最新值。我想出了...
我正在尝试将元数据添加到 kafka 的输出到 S3 存储桶中。 目前,输出只是来自 kafka 主题的消息的值。 我想用下面的东西把它包起来......
即使部署在 kubernetes pod 上,kafka 主题仍然是不可变的吗?
我在 kubernetes pod 上部署了 kafka 主题和模式注册表,我尝试修改/更改 kafka 主题和模式注册表的清单文件,然后模式注册表的行为在
我创建了一个只有一个分区且在本地主机上没有复制的 kafka 主题,通过 kafka 控制台消费者和控制台生产者测试了消息传输,它工作正常,但在 tr...
使用 kafka-go 和循环平衡器时,数据始终进入分区 0
我正在使用 kafka-go 库将消息写入 Kafka。我正在使用循环平衡器,但数据始终进入分区 0。我尝试忽略所有消息的分区字段,但是...
带有 PARTITION BY 和 ORDER BY 的 OVER 子句可忽略主查询的 WHERE 子句
假设我们有以下数据表,我临时生成并填充到下面的 SQL 查询中。我想要的是找到给定名称的第一个循环记录,忽略 where 子句
我如何在本地运行假kafka主题(内存中)来测试kafka?
我尝试了一些依赖项,它期望安装docker或抛出运行时异常 我想在没有 Docker 设置的情况下在指定端口本地运行一个假 Kafka。 还有我的申请...
我想从Kafka获取数据,此方法成功获取记录但无法传递给变量。这是我的代码 公共无效 subscribeFromKafka() 抛出异常 { 列表结果=新
在我的 SpringBoot Java 项目中,我使用的是 kafka,特别是 ReactiveKafka。我正在更新依赖项,特别是这些依赖项: springboot 2.6.6 -> 3.1.5 弹簧卡夫卡 2.8.0 -> 3.0.11 反应堆-
我是容器化新手。我正在尝试设置我的本地环境,我的 java 应用程序想要连接到 Kafka。无法使用 Docker,所以决定使用 Podman。我有三个容器在同一个上运行
Spring Boot 3.1.X及以上版本的Kafka客户端连接问题
我最近将我的一项 Spring Boot 服务升级到 3.1.x,升级后我遇到了 kafka 问题。它似乎无法连接并不断向我提供以下日志。 2024-01-03T06:18...
有没有办法将AWS Cloudwatch日志输入Kafka主题
我正在努力寻找这方面的任何方向。我有一个内部系统可以处理日志以进行监控。我希望从 Cloudwatch 发送错误并在 kafka 主题上发布,其中...
我正在寻找一种从 Kafka 主题中删除(完全删除)已使用记录的方法。我知道有几种方法可以做到这一点,通过更改主题的保留时间或删除...
从 kafka 连接 API 获取任务 ID 以在日志中打印
我有一个kafka连接接收器代码,下面的json作为curl命令传递来注册任务。 如果有人知道如何获取我的连接的任务 ID,请告诉我。例如在
我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。 例如,...
我有两个具有相同组ID的消费者服务器订阅了相同的主题。 一台 kafka 服务器仅运行一个分区。 据我所知,消息应该在这两个中随机消耗
具有手动偏移提交功能的 Kafka 消费者客户端一次只允许客户端
我目前正在使用一个Java Kafka消费者,它手动提交偏移量(enable.auto.commit = false),我发现即使我生成了多个实例,我发现这样的设置也是如此
如何仅删除已消费的消息以及如何在kafka主题中显示未消费的消息?
我们将一个项目从ActiveMQ迁移到Kafka。 过去我们向很多队列写入了太多的消息,消费完之后,ActiveMQ会自动删除消费的消息。仅未消耗
在结构化流 API 中跨多个集群使用共享 Kafka 主题执行 Spark 作业
我正在开发一个 Spark 项目,我需要在两个不同的集群上运行作业,两个集群都使用相同的 Kafka 主题。我希望这些作业能够有效地共享负载并平衡
Python KafkaTimeoutError:等待未来超时
我正在使用 Kafka 将日志发送到主题。发送消息时,我总是收到此错误 消息:“测试日志” 参数:() --- 记录错误 --- 回溯(最近一次调用最后一次): 文件“...
Apache Spark Structured Streaming 中 Spark UI 上的查询和阶段卡住了
我在 EMR 集群 (6.14) 上使用 Apache Spark Structured Streaming (3.1.2)。 Spark 结构化流将数据从 Apache Kafka 流式传输到 Delta Lake 表。当我打开 Spark UI 时,我看到以下内容
debezium 日期/时间字段值超出范围:0000-12-30T00:00:00Z
我们使用 Debezium 将数据同步到 在源表中我们有列timestamptz start_at,当值为0时start_at='0001-01-01 00:00:00.000000 +00:00',但是当我们检查kafka中的数据时,它是
Groovy 抛出 可能的解决方案:解析 LinkedHashMap 时出现 parseText(java.lang.String) 错误
我正在尝试检查kafka输出消息中是否存在该密钥,如果存在则进行进一步的操作。 卡夫卡主题的输出消息如下 [“随机名称_547hcg”:{ “访问_...
我有一个使用 AWS 上的 MSK 集群的 kafkaStreams 应用程序。 我需要清理状态存储(在我的应用程序中使用一些 KTable 后创建)。 我找不到任何方法来访问文件系统......
自消息发布或 sinse 服务器启动以来,kafka 是否计数 log.retatantion
如果我将 log.retantion 设置为 24 小时,则在 1.1.24 15:30 发布了一条消息。 然后服务器宕机了25小时,24年1月16日16:30再次启动,消息会立即删除吗...
假设最初我们有一个包含 3 个分区的主题和一个包含 3 个消费者的消费者组,从该主题进行消费。如果我们在消费者组中再添加一个消费者,分区会重新平衡吗