如何在 Kubernetes multipod 部署中使用 spring kafka 处理 Kafka 容器生命周期
我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry 端点注册表。
我正在寻找一种从 Kafka 主题中删除(完全删除)已使用记录的方法。我知道有几种方法可以做到这一点,通过更改主题的保留时间或删除...
从 kafka 连接 API 获取任务 ID 以在日志中打印
我有一个kafka连接接收器代码,下面的json作为curl命令传递来注册任务。 如果有人知道如何获取我的连接的任务 ID,请告诉我。例如在
在结构化流 API 中跨多个集群使用共享 Kafka 主题执行 Spark 作业
我正在开发一个 Spark 项目,我需要在两个不同的集群上运行作业,两个集群都使用相同的 Kafka 主题。我希望这些作业能够有效地共享负载并平衡
我正在尝试使用“camel-azure-storage-datalake-kafka-connector”从 Kafka 连接到 Azure ADLS Gen2 我有一个运行 Docker 的 Linux 机器,其中包含 debezium/zookeeper、debezium/kafka 和 debe...
我是 Kafka 和设置 Kafka UI 的新手,尽管我在最后的 docker 和 docker-compose 上设置了先决条件。 我最后有一个 3 节点 Kafka 集群设置。 下面是 docker-compose...
Kafka Java Consumer Client 是单线程的吗
我们正在开始使用 Kafka, 在阅读本文时 - https://docs.confluence.io/kafka-clients/java/current/overview.html - 它似乎暗示客户端是单线程的。 * 由于这个...
如何使用 kafka msg key 作为 s3 连接器中的分区标准或 我怎样才能获得密钥并将其存储在 s3 对象中 谢谢!
我正在尝试将元数据添加到 kafka 的输出到 S3 存储桶中。 目前,输出只是来自 kafka 主题的消息的值。 我想用下面的东西把它包起来......
即使部署在 kubernetes pod 上,kafka 主题仍然是不可变的吗?
我在 kubernetes pod 上部署了 kafka 主题和模式注册表,我尝试修改/更改 kafka 主题和模式注册表的清单文件,然后模式注册表的行为在
我创建了一个只有一个分区且在本地主机上没有复制的 kafka 主题,通过 kafka 控制台消费者和控制台生产者测试了消息传输,它工作正常,但在 tr...
使用 kafka-go 和循环平衡器时,数据始终进入分区 0
我正在使用 kafka-go 库将消息写入 Kafka。我正在使用循环平衡器,但数据始终进入分区 0。我尝试忽略所有消息的分区字段,但是...
我如何在本地运行假kafka主题(内存中)来测试kafka?
我尝试了一些依赖项,它期望安装docker或抛出运行时异常 我想在没有 Docker 设置的情况下在指定端口本地运行一个假 Kafka。 还有我的申请...
我想从Kafka获取数据,此方法成功获取记录但无法传递给变量。这是我的代码 公共无效 subscribeFromKafka() 抛出异常 { 列表结果=新
在我的 SpringBoot Java 项目中,我使用的是 kafka,特别是 ReactiveKafka。我正在更新依赖项,特别是这些依赖项: springboot 2.6.6 -> 3.1.5 弹簧卡夫卡 2.8.0 -> 3.0.11 反应堆-
我是容器化新手。我正在尝试设置我的本地环境,我的 java 应用程序想要连接到 Kafka。无法使用 Docker,所以决定使用 Podman。我有三个容器在同一个上运行
Spring Boot 3.1.X及以上版本的Kafka客户端连接问题
我最近将我的一项 Spring Boot 服务升级到 3.1.x,升级后我遇到了 kafka 问题。它似乎无法连接并不断向我提供以下日志。 2024-01-03T06:18...
有没有办法将AWS Cloudwatch日志输入Kafka主题
我正在努力寻找这方面的任何方向。我有一个内部系统可以处理日志以进行监控。我希望从 Cloudwatch 发送错误并在 kafka 主题上发布,其中...
我们正在尝试在我们的项目中使用 kafka 流来从一个主题读取数据并写入另一个主题,并且我们有一个使用 KafkaHeaders 作为过滤某些记录的机制的用例。 例如,...
我有两个具有相同组ID的消费者服务器订阅了相同的主题。 一台 kafka 服务器仅运行一个分区。 据我所知,消息应该在这两个中随机消耗
具有手动偏移提交功能的 Kafka 消费者客户端一次只允许客户端
我目前正在使用一个Java Kafka消费者,它手动提交偏移量(enable.auto.commit = false),我发现即使我生成了多个实例,我发现这样的设置也是如此
如何仅删除已消费的消息以及如何在kafka主题中显示未消费的消息?
我们将一个项目从ActiveMQ迁移到Kafka。 过去我们向很多队列写入了太多的消息,消费完之后,ActiveMQ会自动删除消费的消息。仅未消耗
Kafka 保留设置 - 如果所有消费者组都消费了一个主题,则从队列中删除
假设我有一个kafka队列和一个名为TOPIC的主题,并且我有两个消费者组CONSUMER1和CONSUMER2。我在 TOPIC 中添加了 1000 条数据。 Consumer1 有消费者 800 条数据,CONSUMER2 有
Python KafkaTimeoutError:等待未来超时
我正在使用 Kafka 将日志发送到主题。发送消息时,我总是收到此错误 消息:“测试日志” 参数:() --- 记录错误 --- 回溯(最近一次调用最后一次): 文件“...
Apache Spark Structured Streaming 中 Spark UI 上的查询和阶段卡住了
我在 EMR 集群 (6.14) 上使用 Apache Spark Structured Streaming (3.1.2)。 Spark 结构化流将数据从 Apache Kafka 流式传输到 Delta Lake 表。当我打开 Spark UI 时,我看到以下内容
debezium 日期/时间字段值超出范围:0000-12-30T00:00:00Z
我们使用 Debezium 将数据同步到 在源表中我们有列timestamptz start_at,当值为0时start_at='0001-01-01 00:00:00.000000 +00:00',但是当我们检查kafka中的数据时,它是
Groovy 抛出 可能的解决方案:解析 LinkedHashMap 时出现 parseText(java.lang.String) 错误
我正在尝试检查kafka输出消息中是否存在该密钥,如果存在则进行进一步的操作。 卡夫卡主题的输出消息如下 [“随机名称_547hcg”:{ “访问_...
我有一个使用 AWS 上的 MSK 集群的 kafkaStreams 应用程序。 我需要清理状态存储(在我的应用程序中使用一些 KTable 后创建)。 我找不到任何方法来访问文件系统......
自消息发布或 sinse 服务器启动以来,kafka 是否计数 log.retatantion
如果我将 log.retantion 设置为 24 小时,则在 1.1.24 15:30 发布了一条消息。 然后服务器宕机了25小时,24年1月16日16:30再次启动,消息会立即删除吗...
假设最初我们有一个包含 3 个分区的主题和一个包含 3 个消费者的消费者组,从该主题进行消费。如果我们在消费者组中再添加一个消费者,分区会重新平衡吗
每当我在 GCP 中创建一个新项目时,它都会预加载许多我不想要的 API/服务: BigQuery API BigQuery 迁移 API BigQuery 存储 API ...
我有一个 REST API,它使用基本身份验证进行身份验证。我将此 API 添加到 WSO2 API 管理器并获取该 API 的生产 URL。基本上我需要 API 管理器来查看 API 使用情况的统计数据。我
如何向 iPhone 应用程序添加私有 API 和框架。像Apple80211.
我有两个 API 端点,如下所示。 [路线(“api/[控制器]”)] [API控制器] 公共类 StudentController : ControllerBase { IStudentRepository 学生存储库; 公开
我有一个API URL,是这样的 https://api-amvstrm.nyt92.eu.org/api/v1 /popular 和 API 响应如下所示 t itle": "地牢网格", "id": "地牢-...
.NET 如何允许 API 探索已编译的 DLL?
关于实施和迁移到 API V3 的 YouTube 官方文档,他们说: YouTube 数据 API (v2) 功能:检索视频推荐 v3 API 不会检索以下列表
如何使用 OpenCL C++ API 获取设备 cl_device_id
我开始使用 OpenCL C++ API。我已经使用 C API 很长时间了。 C++ API 更加优雅、简单,代码更少且不易出错,但我需要设备 ID。我...
我试图弄清楚如何修改此处找到的 Foursquare iOS API“BZFoursquare”:https://github.com/baztokyo/foursquare-ios-api。 在他们的示例中,他们使用一体化 ViewContro...
如何在Flutter中实现凭证管理API? 我想在 Google 中保存登录凭据。如何在浏览器中调用凭证管理API?
我想从 org.sonar.api.scanner.sensor.ProjectSensor 的实现中执行对 Sonar Server 的 API 调用。是否有任何“合规”的执行方式,例如: https://myserver:9001/api...
在 Firely .Net Sdk 中创建患者 FHIR API 负载时出错
我正在尝试使用以下 API Url 在 EPIC 沙盒中创建患者 https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/Patient 我正在使用 Firely .Net Sdk 进行 EPIC FHIR API 集成...
我正在配置 AWS CloudFront。我有两条后端路线 /api/v3/test1/统计 /api/v3/test2/主题 我创建了 2 个行为 /api/v3/*/统计信息 /api/v3/*/主题 我正在使用预定义的策略
如何在 svelteKit api 目录中创建 catch-all 路由?
我正在 sveltekit 中创建 API 端点。所有 API 端点都位于 /src/routes/api/DIR 内。 如何为任何不存在的 GET、POST、DELETE 和 PUT 请求创建主捕获所有路由?
Google 地图 API 通过 API 发出的路线请求返回 ZERO_RESULTS,但适用于 Google 地图
有问题的呼叫是: https://maps.googleapis.com/maps/api/directions/json?origin=35.73455050,-95.31531510&destination=29.67404860,-95.54087240&waypoints=29.92853940,-95.29782860...
MPMoviePlayerController 的“currentPlaybackTime”属性是私有 API
我不确定这个私有 API 的事情。 MPMoviePlayerController 的 MPMediaPlayback 协议中的属性 currentPlaybackTime 是私有 API 吗? 我问,因为这个专业人士...
我正在尝试在 docker 容器内的两个 .BET API 之间进行通信。我通过 docker-compose 上传两个 API 和数据库,我可以单独访问这两个 API,但是当我尝试...
我像这样设置 lambda 函数: 我想部署在 api 上。当我将 api 设置为休息 api 时,我尝试测试它,如下所示: 我收到以下错误: 要求 / 潜伏 43 地位 200 回应...
我使用 MediatR 编写了一个简单的 API,并附有我在网上找到的示例。它有一个 GET 和一个 POST。 API 获取: 命名空间 API.Features.My.Queries; [路线(“api/mydata/{myid}”)] 公开
{"cod":401, "message": "使用 Moya 时出现 Open Weather API 错误,API 密钥无效
所以我使用moya创建了一个对openweatherAPI的API请求。现在 Postman 的返回似乎没问题,但 X 代码上的 API 调用返回 401: Invalid API key 我已经尝试了很多方法来看看到底是什么......