用于与Apache Kafka使用者API相关的问题
Kafka可以监控消费者的心跳并通过持久化在队列中重试作业(最终将其发送给不同的消费者)吗? 我还知道 SQL 表可以用作此 p 的队列...
Python 中的 ConfluenceKafka:使用消费者到消费者记录集
我已经开始学习confluence kafka(python)。有 1 个生产者、1 个主题、1 个分区和 1 个消费者(简单设置)。我的要求是我希望集中获取数据。我读到了使用...
默认KafkaConsumerFactory - ConcurrentHashMap - NullPointerException
我正在使用kafka消费者。我从 Spring boot 1.5 升级到 2.6。现在,当我运行应用程序时,它无法开始抛出 NullPointerException 。请告诉我是否有人可以帮助...
我正在使用 Kafka v0.10.1.1 和 Spring-boot。 我正在尝试使用以下生产者代码在 Kafka 主题移动用户中生成消息: 主题移动用户有5个分区和2个复制fa...
(我正在编辑问题,因为我认为它不够清楚) 如何对我的 kafka 消费者进行负载测试? 我看过很多关于负载测试 apache kafka 的文章,但没有一篇关于负载测试 con...
Spring-kafka:打印RecordHeaders的正确方法是什么?
刚刚检查了日志,发现 ConsumerRecord 标头以绝对不可读的方式打印,原因是 RecordHeader 将值保存为 byte[],结果在日志中我看到了
在 Spring Boot 应用程序中实现反应式 Kafka 监听器
我正在尝试在 Spring boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/...
尝试使用 confluence_kafka.AvroConsumer 来消费给定时间戳的消息。 如果标志: # 创建一个列表 topic_partitions_to_search = 列表( 映射(lambda p:TopicPartition('
如何消费kafka消息上的最后一条消息或者根据时间戳消费消息?
def kafkaa(自我,auto_offset_reset,超时= 500): group_name = "群组名称" config = {“bootstrap.servers”:“服务器”, “是...
我正在尝试实现动态 Kafka 异常处理,特别是在某些条件下重试,但找不到任何方法来过滤不应该仅由其类重试的异常。我的...
我正在使用spring kafka,并且我有一个用java spring boot编写的kafka消费者。我的消费者按批次消费,相关配置 bean 如下所示。 @豆 公共消费者工厂 我正在使用 spring kafka,并且我有一个用 java spring boot 编写的 kafka 消费者。我的消费者批量消费和相关配置bean如下。 @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); // default configs like bootstrap servers, key and value deserializers are here config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG); factory.setBatchListener(true); return factory; } 我使用消息并将这些消息发送到 API 端点。如果 api 不可用或者其余模板抛出错误,我想将整个批次发送到 DLT 而无需重试。 我想要做的是将整个批次发送到 DLT,而不重试。如果我们抛出 BatchListenerFailedException,则批次中拥有特定索引号的消息将发送到 DLT。在 BatchListenerFailedException 中,我们只能传递一个整数值作为索引值,而不是一个列表。但我想要的是将整个批次按原样发送到 DLT 主题,而无需重试。有办法实现吗? 我的Spring Kafka版本是2.8.6 编辑 我的默认错误处理程序如下 @Bean public CommonErrorHandler commonErrorHandler() { ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = new ExponentialBackOffWithMaxRetries(5); exponentialBackOffWithMaxRetries.setInitialInterval(my val); exponentialBackOffWithMaxRetries.setMultiplier(my val); exponentialBackOffWithMaxRetries.setMaxInterval(my val); DefaultErrorHandler errorHandler = new DefaultErrorHandler( new DeadLetterPublishingRecoverer(kafkaTemplate(), (record, exception) -> new TopicPartition(record.topic() + "-dlt", record.partition())), exponentialBackOffWithMaxRetries); errorHandler.addNotRetryableExceptions(ParseException.class); errorHandler.addNotRetryableExceptions(EventHubNonRetryableException.class); return errorHandler; } 在我的例子中使用ExponentialBackOffWithMaxRetries而不是FixedBackOff。就我而言,我有 3 个场景。 1 - 重试消息并将其发送到 DLT(抛出除 BatchListenerFailedException 之外的任何其他异常) 2 - 将批次中的几条消息发送到 DLT,无需重试(为此使用 BatchListenerFailedException) 3 - 将整个批次发送到 DLT,无需重试。 第三个是我苦苦挣扎的地方。如果我发送一些其他异常,那么它会重试几次。 (即使我用FixedBackOff代替ExponentialBackOffWithMaxRetries) 扔BatchListenerFailedException以外的其他东西;将 DefaultErrorHandler 与 DeadLetterPublishingRecoverer 一起使用,无需重试 (new FixedBackOff(0L, 0L))。 编辑 从版本3.0.0、2.9.3、2.8.11开始,您可以为批量错误配置不可重试的异常。 https://github.com/spring-projects/spring-kafka/issues/2459 看 /** * Add exception types to the default list. By default, the following exceptions will * not be retried: * <ul> * <li>{@link DeserializationException}</li> * <li>{@link MessageConversionException}</li> * <li>{@link ConversionException}</li> * <li>{@link MethodArgumentResolutionException}</li> * <li>{@link NoSuchMethodException}</li> * <li>{@link ClassCastException}</li> * </ul> * All others will be retried, unless {@link #defaultFalse()} has been called. * @param exceptionTypes the exception types. * @see #removeClassification(Class) * @see #setClassifications(Map, boolean) */ @SafeVarargs @SuppressWarnings("varargs") public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) { add(false, exceptionTypes); notRetryable(Arrays.stream(exceptionTypes)); } 请注意,2.8.x 现已不再支持 OSS。 https://spring.io/projects/spring-kafka#support 您能就第二个场景的实施与我联系吗?这将是一个很大的帮助。 谢谢
我的 Kafka Consumer 端有 2 个主题:主题 A 和主题 B。两者都采用相同的方法。详细方法如下; @Slf4j @服务 公共类 KafkaConsumerService { @KafkaListe...
我的 Kafka Consumer 端有 2 个主题。主题 A 和主题 B。两者都采用相同的方法。详细方法如下; @Slf4j @服务 公共类 KafkaConsumerService { @KafkaListe...
Kafka Consumer 无法使用 KafkaJS 在本地工作
我正在尝试使用 KafkaJS 在本地运行 Kafka 消费者。它表明消费者正在运行,但它没有从生产者主动向其中推送事件的主题进行消费。消费者
我有一个简单的分布式系统架构,其中一个生产者系统将事件写入一个 kafka 主题。这些事件基本上只被一个系统消耗。这位消费者整晚都在加载...
据我所知, kafka中引入分区和(消费者)组的概念来实现并行性。我正在通过 python 与 kafka 合作。我有一个特定的主题,它有(比如说)2 个分区。
我有一个 Spring Boot 应用程序,它有一个带有 @KafkaListener 的简单 Consumer。我有阻止重试逻辑,可以按预期工作,但如果出现反序列化异常,我想存储...
我的 apache kafka 有问题,问题是; 当我从后端(spring-boot)向我的主题发送消息时,python 客户端不会立即收到它,而且根本没有收到......
使用 subscribe() 和 allocate() 读取 kafka 主题的奇怪区别
我的任务是统计Kafka主题中的消息(有些有一个分区,有些有多个分区)。我尝试了两种技术:一种使用 subscribe(),另一种使用 allocate()。 完整代码: #!/usr/bin/env py...
使用kafkaListener和syncCommit,但它们不起作用
我有一个应用程序,它侦听 Kafka 主题并在 1 个线程中从中读取 1 条消息。读完一条消息后,有一定的逻辑,期间有两个选择——一切都是