用于与Apache Kafka使用者API相关的问题
我正在每半小时运行一次的服务中调用该课程。我只想从主题列表中获取最后一条消息。 但随机我没有得到最新的记录。相反,它显示了
我是卡夫卡新手。并对 kafka 及其配置有了基本的了解。 我的问题是: 如果消费者应用程序在消费者组中有2个消费者。现在,如果我自动缩放(增加)pod,
当自动提交设置为 false 时,Kafka 监听器开始消费消息时如何重新传递消息
假设我启动消费者时将自动提交设置为 false 并且消费者开始监听消息。 我的监听器处理了 50 条消息,轮询总数为 100 条,然后只处理了 50 条......
大家 我是卡夫卡的新手。我正在为一个 python 模块编写代码,该模块将处理来自 Kafka 的消息。该模块将使用 docker 进行容器化。该模块将读取 Kafka 消息,执行一些操作
卡夫卡版本:0.9 我们有一个 Java 应用程序尝试从 Kafka 读取数据。但我们看到 kafka 消息中的数据损坏 以下是 Java 应用程序的错误: org.apache.kafka.common.
如何实现ReplyingKafkaTemplate而不使用@KafkaListener和@SendTo发送响应到回复主题
美好的一天。 我正在尝试使用 ReplyingKafkaTemplate 实现同步请求回复消息流。作为个人 POC,它运行良好。但我的应用程序已经实现了 kafka 来消费
Kafka Consumer 第一次 poll(0) 没有返回数据
我正在使用 confluence-kafka-client。我有一个生产者生产一个主题,其中一个分区和一个组 ID 内有一个消费者。首先,我为该主题创建一个生产者(使用默认配置)...
引起:org.apache.common.config.ConfigException:bootstrap.server 中没有给出可解析的引导 URL 当我在配置中添加此内容时: @配置 类主题配置{ 公关...
Golang Segmentio 在消费者组迁移时获取半分区
我正在使用 github.com/segmentio/kafka-go lib。 我的服务托管在 Kubernetes 集群上,我的用例是,我有一个消费者组 C1,它正在使用来自主题 T1 的消息,该主题有 70
组 ID 未注册的应用程序信息 kafka.consumer
我在使用 Spring Cloud Stream 的 Kafka 消费者服务的日志中多次看到这个特定的日志“App info kafka.consumer for group-id unregistered”。我无法理解...
我在卡夫卡消费者端遇到了下面的异常。令人惊讶的是,这个问题与旧版本的代码不一致(具有完全相同的配置,但有一些新的不相关...
Kafka 3.4,带有 Kraft,没有 Zookeeper,有 3 个经纪人
我需要 1 个 kafka 集群和 3 个 kraft borker。 我尝试使用下面的 docker 配置: 第一个经纪人的配置: 版本:“3” 服务: 卡夫卡-1: 图片:“bitnami/kafka:3.4.0” 主机...
我有一个线程,偶尔会列出消息中心上的主题。但有时,我会收到一条 :Failed to send SSL Close 消息。 有任何想法吗? 卡夫卡消费者 我有一个线程,偶尔会列出消息中心上的主题。但偶尔,我会收到一条 :Failed to send SSL Close 消息。 有什么想法吗? KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfiguration()); try { Map<String, List<PartitionInfo>> topics = consumer.listTopics(); return new ArrayList<String>(topics.keySet()); } finally { if (consumer != null) { **consumer.close();** } } 我收到来自 consumer.close 的警告。 消费者的配置: sasl.mechanism = PLAIN 安全协议= SASL_SSL group.id = 消费者1 ssl.enabled.protocol = TLSv1.2 ssl.endpoint.identification.algorithm = HTTPS ssl.协议 = TLSv1.2 sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule 需要用户名=“用户名”密码=“密码”; [警告] 2018-01-25 20:12:23.204 [ClusterChannelMonitorTaskThread] org.apache.kafka.common.network.SslTransportLayer {} - 发送失败 SSL 关闭消息 java.io.IOException:返回了意外状态 SSLEngine.wrap,预期关闭,收到正常。不会发送关闭 给同行的消息。在 org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:158) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.common.network.Selector.doClose(Selector.java:582) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.common.network.Selector.close(Selector.java:573) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.common.network.Selector.close(Selector.java:539) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.common.network.Selector.close(Selector.java:250) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:505) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:439) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1613) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573) [kafka-clients-0.11.0.0.jar:?] 在 org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549) [kafka-clients-0.11.0.0.jar:?] 在 com.ibm.saas.msg.kafka.KafkaMessageService.listChannelNames(KafkaMessageService.java:305) [saas-msg-kafka-TRUNK-SNAPSHOT.jar:TRUNK-SNAPSHOT] 未发送警报 SSL。 SSL/TLS 协议用于安全通信,与关闭消息相关。 SSL/TLS 协议规定必须向对方提供“close_notify”警告,告知其连接关闭。此警告消息表明 Kafka 客户端无法发送此“close_notify”警报。 这通常是无害的警告,可以忽略。这并不表明 Kafka 或您的应用程序有问题。它只是表明连接结束时没有发送“close_notify”警告。 更新您的 Kafka 客户端:当前版本的 Kafka 客户端可能存在已修复的问题。考虑更新到最新的 Kafka 客户端版本。 修改您的 SSL/TLS 配置:问题可能是由您正在使用的特定 SSL/TLS 版本或您选择的密码套件引起的。尝试调整 SSL.protocol 或 SSL.enabled.protocols 的设置。 尝试在代码中尽快关闭 Kafka 消费者,这样它就不会关闭应用程序的其余部分。这可能允许用户在应用程序终止之前提交“close_notify”警报。
我有一个关于多个分区的主题,并且该主题的 4 个消费者都在同一个消费者组中。如果一个消费者暂停该主题,其他三个消费者也会暂停。如果有办法...
我在 Spring Boot 中有一个 Kafka Consumer api,因此没有对其进行特定调用,因为它是 Kafka Consumer Listener。这样的 api 可以定义或记录开放的 api 规范吗? 由于 api 没有
我有很多 ipfix(netflow) 记录插入到 Kafka 中,我已经使用此代码通过 go 语言创建了消费者 包主 进口 ( “语境” “数据库/sql” ...
Kafka >= 0.10.1 的 session.timeout.ms 和 max.poll.interval.ms 之间的差异
我不清楚为什么我们需要 session.timeout.ms 和 max.poll.interval.ms 以及何时使用其中之一或两者?似乎这两个设置都指示了坐标时间的上限...
Docker-compose:ModuleNotFoundError:没有名为“core”的模块
我使用fastapi、docker、docker-compose、kafka。 我尝试启动模块消费者: docker-compose up 消费者模块 但我收到 ModuleNotFoundError: ModuleNotFoundError:没有名为“core”的模块 ...
我正在尝试在 k8s 集群上运行 Kafka connect,我面临的问题是在提交连接器期间。我正在运行 4 个不同主题的任务,并希望将数据存储到 S3。 我创建了 4 个不同的...
请帮忙处理kafka协议。不知道为什么请求不起作用。 我认为这个问题出现在字符串的紧凑版本中。但不知道具体原因。 我向 Kafka 发送 FindCoordinator v3 请求并