apache-kafka 相关问题

Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。

Avro Schema,引用 json 文件中的枚举值

我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:

回答 1 投票 0

Avro Schema,参考 yaml 文件中的枚举值

我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:

回答 1 投票 0

Spark 传入 JSON 流处理

我一直在尝试完成一个项目,其中我需要使用kafka将数据流发送到本地Spark来处理传入的数据。但是我无法显示和使用右侧的数据框...

回答 1 投票 0

如何更改 Kafka 作业的 Promtail 抓取间隔

我在 Kubernetes 集群中运行 Kafka,并使用 Promtail 将 Kafka 消息发送到 Loki。现在,从生成消息到 Loki 收到消息之间大约有 5 秒的延迟……

回答 1 投票 0

添加额外的代理节点后Kafka主题分区同步

我们在项目中使用 Confluence Platform 5.5 社区版,有 4 个 Broker 和 3 个 Zookeeper。 我们想在现有集群中再添加一个代理。添加代理后我们如何进行同步

回答 1 投票 0

无法使用Spring Cloud Kafka Binder处理消息

在下面的代码中,我尝试通过 REST 端点调用 processOrder() 来创建消息。然后,我想将 processOrder() 的结果传递给 processShipping() 和 processPayment。 然而...

回答 1 投票 0

AKS 中部署的微服务无法连接到 AKS 中部署的 kafka

我正在将 Spring 微服务项目部署到 AKS,其余服务工作正常,但是当我部署使用 Kafka 的服务时,在我使用 helm 单独部署 Kafka 后, 我重复了

回答 1 投票 0

Spring Cloud Stream Kafka Binder - 在批处理模式下使用 DLQ 时重试不起作用

我使用的是Spring Cloud版本2023.0.1(Spring Cloud Stream版本4.1.1),我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @豆 消费者 我使用的是Spring Cloud Version 2023.0.1(Spring Cloud Stream版本4.1.1),并且我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @Bean Consumer<Message<List<String>>> consumer1() { return message -> { final List<String> payload = message.getPayload(); final MessageHeaders messageHeaders = message.getHeaders(); payload.forEach(System.out::println); payload.forEach(p -> { if(p.startsWith("a")) { throw new RuntimeException("Intentional Exception"); } }); System.out.println(messageHeaders); System.out.println("Done"); }; } 我的application.yml文件看起来像这样 spring: cloud: function: definition: consumer1; stream: bindings: consumer1-in-0: destination: topic1 group: consumer1-in-0-v0.1 consumer: batch-mode: true use-native-decoding: true max-attempts: 3 kafka: binder: brokers: - localhost:9092 default: consumer: configuration: max.poll.records: 1000 max.partition.fetch.bytes: 31457280 fetch.max.wait.ms: 200 bindings: consumer1-in-0: consumer: enableDlq: true dlqName: dlq-topic dlqProducerProperties: configuration: value.serializer: org.apache.kafka.common.serialization.StringSerializer key.serializer: org.apache.kafka.common.serialization.StringSerializer configuration: key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer 我还指定了 ListenerContainerWithDlqAndRetryCustomizer 来自定义重试 @Bean ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) { return new ListenerContainerWithDlqAndRetryCustomizer() { @Override public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group, @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver, @Nullable BackOff backOff) { ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver); container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff)); } @Override public boolean retryAndDlqInBinding(String destinationName, String group) { return false; } }; } 问题 当发生错误时,消息批直接进入DLQ。并且不会尝试重试。 但是问题是,可能会出现暂时性错误,导致批处理处理失败,我希望在将批处理发送到 DLQ 之前重试几次。但我无法让它工作。 我做错了什么? 万一将来有人偶然发现这个问题,我就知道出了什么问题。 我必须从 enableDlq 文件中删除 dlqName、dlqProducerProperties 和 application.yml。 然后就成功了。 在java代码中,我还删除了ListenerContainerWithDlqAndRetryCustomizer并只使用了ListenerContainerCustomizer。 代码看起来像这样: @Bean public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) { return (container, dest, group) -> container.setCommonErrorHandler(errorHandler); } @Bean public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4)); } @Bean public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate, KafkaOperations<?, ?> bytesTemplate, KafkaOperations<?, ?> longTemplate) { Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>(); templates.put(String.class, stringTemplate); templates.put(byte[].class, bytesTemplate); templates.put(Long.class, longTemplate); return new DeadLetterPublishingRecoverer(templates); } @Bean public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); } @Bean public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf); } @Bean public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); }

回答 1 投票 0

Kafka Streams List Serdes:始终为空

我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...

回答 2 投票 0

Smallrye Kafka - 每个通道定义代理 - 不消耗任何事件

我在 Quarkus 应用程序中使用 Smallrye-Kafka 连接到 kafka。到目前为止,要求是“拥有一个对所有通道都有效的引导服务器”。配置...

回答 1 投票 0

创建许多空映射时导致 Java 内存泄漏

在从 Kafka 接收消息的过程中,当我使用 Map.of() 创建空映射时,可能会导致内存泄漏或更密集的 CPU 使用? 可以有多个侦听器,但是...

回答 1 投票 0

如何在 AWS devezium 中设置 LSN 编号以在 LSN 后启动事件日志

如何在 AWS devezium 中设置 LSN 编号以在 LSN 之后启动事件日志,就像我在表 Employee 中有 100 条记录,在备份后,我有这 100 条记录并恢复它,现在我开始...

回答 1 投票 0

Kafka 事件驱动或基于轮询

我想了解Kafka的整体结构是被认为是事件驱动的还是基于事件驱动+轮询的。 当 kafka 代理收到消息时,它会向消费者发送事件还是......

回答 1 投票 0

在 Citrus Kafka Endpoint 中接收消息时将 AVRO 负载转换为 JSON

我正在做一个 PoC 来验证是否可以使用 Citrus 框架进行自动化测试,这需要检查 Kafka 主题中是否已发布一些数据。 对于演示,我使用 Kotlin,我也使用

回答 1 投票 0

如果在处理过程中抛出异常,Kafka-streams 是否会提交偏移量?

我有一个 Kafka 流应用程序,我创建了自定义生产/消费/未捕获的异常处理程序,所有处理程序都返回 CONTINUE 以继续处理,尽管出现异常。 Kafka 流通讯吗...

回答 2 投票 0

如何通过 docker-compose 运行纯 Kafka 和 Kafka Connect?

我正在编写我的第一个基于 Kafka 的项目,并遇到一些集成问题。卡夫卡和卡夫卡连接。这是我的 docker-compose.yaml 版本:'3.3' 服务: 数据库: 图片:'postg...

回答 1 投票 0

Kafka Broker 身份验证失败 - 凭据无效

在最后一天左右的时间里,我尝试使用融合的 docker 镜像在本地设置一个节点的 Kafka 集群。不幸的是,一直未能做到这一点。以下是我的所有配置文件: /etc/卡夫卡/

回答 2 投票 0

使用 ssl 访问 kafka 代理时出错

连接到kafka代理时出现错误,汇合控制台日志显示此错误。 [2018-03-22 11:46:03,545] 警告无法发送 SSL 关闭消息(org.apache.kafka.common.network。

回答 2 投票 0

Kafka + 使用 prometheus 与 kafka cli 命令有冲突

我们使用 Prometheus 一段时间并且非常享受它。 关于什么是 jmx-exporter 的几句话 jmx-exporter 是一个从基于 JVM 的应用程序(例如 Java 和 Scala)读取 JMX 数据的程序...

回答 5 投票 0

从 kafka 连接 API 获取任务 ID 以在日志中打印

我有一个kafka连接接收器代码,下面的json作为curl命令传递来注册任务。 如果有人知道如何获取我的连接的任务 ID,请告诉我。例如在

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.