Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。
我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:
我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:
我一直在尝试完成一个项目,其中我需要使用kafka将数据流发送到本地Spark来处理传入的数据。但是我无法显示和使用右侧的数据框...
我在 Kubernetes 集群中运行 Kafka,并使用 Promtail 将 Kafka 消息发送到 Loki。现在,从生成消息到 Loki 收到消息之间大约有 5 秒的延迟……
我们在项目中使用 Confluence Platform 5.5 社区版,有 4 个 Broker 和 3 个 Zookeeper。 我们想在现有集群中再添加一个代理。添加代理后我们如何进行同步
无法使用Spring Cloud Kafka Binder处理消息
在下面的代码中,我尝试通过 REST 端点调用 processOrder() 来创建消息。然后,我想将 processOrder() 的结果传递给 processShipping() 和 processPayment。 然而...
AKS 中部署的微服务无法连接到 AKS 中部署的 kafka
我正在将 Spring 微服务项目部署到 AKS,其余服务工作正常,但是当我部署使用 Kafka 的服务时,在我使用 helm 单独部署 Kafka 后, 我重复了
Spring Cloud Stream Kafka Binder - 在批处理模式下使用 DLQ 时重试不起作用
我使用的是Spring Cloud版本2023.0.1(Spring Cloud Stream版本4.1.1),我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @豆 消费者 我使用的是Spring Cloud Version 2023.0.1(Spring Cloud Stream版本4.1.1),并且我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @Bean Consumer<Message<List<String>>> consumer1() { return message -> { final List<String> payload = message.getPayload(); final MessageHeaders messageHeaders = message.getHeaders(); payload.forEach(System.out::println); payload.forEach(p -> { if(p.startsWith("a")) { throw new RuntimeException("Intentional Exception"); } }); System.out.println(messageHeaders); System.out.println("Done"); }; } 我的application.yml文件看起来像这样 spring: cloud: function: definition: consumer1; stream: bindings: consumer1-in-0: destination: topic1 group: consumer1-in-0-v0.1 consumer: batch-mode: true use-native-decoding: true max-attempts: 3 kafka: binder: brokers: - localhost:9092 default: consumer: configuration: max.poll.records: 1000 max.partition.fetch.bytes: 31457280 fetch.max.wait.ms: 200 bindings: consumer1-in-0: consumer: enableDlq: true dlqName: dlq-topic dlqProducerProperties: configuration: value.serializer: org.apache.kafka.common.serialization.StringSerializer key.serializer: org.apache.kafka.common.serialization.StringSerializer configuration: key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer 我还指定了 ListenerContainerWithDlqAndRetryCustomizer 来自定义重试 @Bean ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) { return new ListenerContainerWithDlqAndRetryCustomizer() { @Override public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group, @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver, @Nullable BackOff backOff) { ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver); container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff)); } @Override public boolean retryAndDlqInBinding(String destinationName, String group) { return false; } }; } 问题 当发生错误时,消息批直接进入DLQ。并且不会尝试重试。 但是问题是,可能会出现暂时性错误,导致批处理处理失败,我希望在将批处理发送到 DLQ 之前重试几次。但我无法让它工作。 我做错了什么? 万一将来有人偶然发现这个问题,我就知道出了什么问题。 我必须从 enableDlq 文件中删除 dlqName、dlqProducerProperties 和 application.yml。 然后就成功了。 在java代码中,我还删除了ListenerContainerWithDlqAndRetryCustomizer并只使用了ListenerContainerCustomizer。 代码看起来像这样: @Bean public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) { return (container, dest, group) -> container.setCommonErrorHandler(errorHandler); } @Bean public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4)); } @Bean public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate, KafkaOperations<?, ?> bytesTemplate, KafkaOperations<?, ?> longTemplate) { Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>(); templates.put(String.class, stringTemplate); templates.put(byte[].class, bytesTemplate); templates.put(Long.class, longTemplate); return new DeadLetterPublishingRecoverer(templates); } @Bean public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); } @Bean public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf); } @Bean public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); }
Kafka Streams List Serdes:始终为空
我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...
Smallrye Kafka - 每个通道定义代理 - 不消耗任何事件
我在 Quarkus 应用程序中使用 Smallrye-Kafka 连接到 kafka。到目前为止,要求是“拥有一个对所有通道都有效的引导服务器”。配置...
在从 Kafka 接收消息的过程中,当我使用 Map.of() 创建空映射时,可能会导致内存泄漏或更密集的 CPU 使用? 可以有多个侦听器,但是...
如何在 AWS devezium 中设置 LSN 编号以在 LSN 后启动事件日志
如何在 AWS devezium 中设置 LSN 编号以在 LSN 之后启动事件日志,就像我在表 Employee 中有 100 条记录,在备份后,我有这 100 条记录并恢复它,现在我开始...
我想了解Kafka的整体结构是被认为是事件驱动的还是基于事件驱动+轮询的。 当 kafka 代理收到消息时,它会向消费者发送事件还是......
在 Citrus Kafka Endpoint 中接收消息时将 AVRO 负载转换为 JSON
我正在做一个 PoC 来验证是否可以使用 Citrus 框架进行自动化测试,这需要检查 Kafka 主题中是否已发布一些数据。 对于演示,我使用 Kotlin,我也使用
如果在处理过程中抛出异常,Kafka-streams 是否会提交偏移量?
我有一个 Kafka 流应用程序,我创建了自定义生产/消费/未捕获的异常处理程序,所有处理程序都返回 CONTINUE 以继续处理,尽管出现异常。 Kafka 流通讯吗...
如何通过 docker-compose 运行纯 Kafka 和 Kafka Connect?
我正在编写我的第一个基于 Kafka 的项目,并遇到一些集成问题。卡夫卡和卡夫卡连接。这是我的 docker-compose.yaml 版本:'3.3' 服务: 数据库: 图片:'postg...
在最后一天左右的时间里,我尝试使用融合的 docker 镜像在本地设置一个节点的 Kafka 集群。不幸的是,一直未能做到这一点。以下是我的所有配置文件: /etc/卡夫卡/
连接到kafka代理时出现错误,汇合控制台日志显示此错误。 [2018-03-22 11:46:03,545] 警告无法发送 SSL 关闭消息(org.apache.kafka.common.network。
Kafka + 使用 prometheus 与 kafka cli 命令有冲突
我们使用 Prometheus 一段时间并且非常享受它。 关于什么是 jmx-exporter 的几句话 jmx-exporter 是一个从基于 JVM 的应用程序(例如 Java 和 Scala)读取 JMX 数据的程序...
从 kafka 连接 API 获取任务 ID 以在日志中打印
我有一个kafka连接接收器代码,下面的json作为curl命令传递来注册任务。 如果有人知道如何获取我的连接的任务 ID,请告诉我。例如在