apache-kafka 相关问题

Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。

如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的 kafka 密钥?

我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,其中 kafka_key 元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码有效负载的模式存储...

回答 2 投票 0

如何在特定时间范围内聚合KStream到固定大小的列表?

考虑这个 KStream: KStream inputStream = StreamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())); 物化 考虑这个 KStream: KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())); Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE); KStream<Windowed<String>, List<String>> outputStream = inputStream .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2))) .aggregate( ArrayList::new, (key, string, aggregate) -> { aggregate.add(string); return aggregate; }, with) .toStream(); outputStream 将聚合来自 inputStream info 的所有消息,定义时间范围内的消息数组。 另外,现在我想将消息聚合到特定限制,例如直到列表大小不超过 50 条。 如果列表在聚合过程中变得大于 50,我想以某种方式将其拆分为附加列表。 基本上,我希望实现的输出是获取一组消息,其大小达到限制(例如 50 条),并且达到特定的时间范围,以先到者为准。 为了实现这一目标,我在这里缺少什么? 您可以尝试将 KTable 转换为 KStream 并执行 flatMapValues 来拆分列表,如下所示(Kotlin 中的代码): val s = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String())) val output = s .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2))) .aggregate({ mutableListOf() }, { k: String, str: String, agg: List<String> -> agg.plus(str) }) .toStream() .flatMapValues { strList -> strList.chunked(50) } 但这意味着您将整个聚合列表加载到内存中 - 可能会也可能不会是一个问题,具体取决于列表大小和您的内存设置,但这绝对是需要记住的事情。

回答 1 投票 0

将 Kafka 扩展到 1 个数据中心内的安全区域

我的数据中心内有 2 个安全区域。我想部署kafka,这样如果Security zone-1中有Broker-1发布的主题,那么该主题也可以在Security zone-...

回答 1 投票 0

Kafka Streams 用于聚合事件处理和对大量任务的可扩展性

我有以下任务:假设我正在开发一个由一百万用户使用的在线商店。用户在不同的时间进行购买。每个用户的每次购买都会变成一次交易事件。我...

回答 1 投票 0

Cypress 和 Kafka 自动化

我想知道是否有一些如何使用 cypress 和 apache kafka 进行自动化测试。 为了能够在代理中“获取”特定主题,请发送主题(生产者)并接收主题(

回答 1 投票 0

卡夫卡·卡夫|使用kafka-acls时没有配置Authorizer

使用 kafka-configs 创建一个 scram 用户: /bin/kafka-configs --bootstrap-server 代理:9092 --alter --add-config 'SCRAM-SHA-256=[password="password"]' --entity-type users --entity-name "

回答 1 投票 0

Kafka 连接 Amazon MSK

如何将 Kafka Connect 适配器与 Amazon MSK 结合使用? 根据 AWS 文档,它支持 Kafka 连接,但没有记录如何设置适配器和使用它。

回答 3 投票 0

Kafka docker 撰写外部连接[重复]

我想将 9093 暴露到 docker 容器之外。当我将 kafka-0 端口设置为暴露给 9093 和 KAFKA_ADVERTISED_LISTENERS 时,我无法连接到 localhost:9093,如下所示...

回答 1 投票 0

Docker Kafka 与 Python 消费者

我正在使用dockerized Kafka并编写了一个Kafka消费者程序。当我在本地计算机上的 docker 和应用程序中运行 Kafka 时,它运行得很好。但是当我在

回答 4 投票 0

无法从 Talend 中的 tKafkaInput 获取特定分区数据

我们有一个分布在 3 个分区的 Kafka 主题,我需要在一个作业中从分区 1 和另一作业中从分区 2 获取/读取数据。我尝试过在

回答 1 投票 0

Kafka:ZstdIOException:无法从 BufferPool 获取大小为 131075 的 ByteBuffer

我们有 3 个节点的 Kafka 集群,使用 bitnami/kafka:3.6.0 图像和 Kraft 协议。 我们还在主题和生产者上使用 zstd 压缩集。 突然,其中一个节点开始在错误之后发送垃圾邮件......

回答 1 投票 0

如何使用 ssl 捆绑配置 kafka Schema 注册表客户端

我正在寻找一种使用 ssl 捆绑包配置架构注册表客户端的方法。 我有一个向 kafka 主题发送消息的应用程序。 生产者是使用 keystore 和 trustore 配置的

回答 1 投票 0

Kafka 在发生 Broken Pipe 错误后不会重新加入集群

我有一个 Kafka 集群,有 3 个经纪人和 3 个动物园管理员。根据Kafka server.log,由于某种未知原因,它间歇性地遇到损坏的管道错误。断管错误消失后,inste...

回答 2 投票 0

将 Quarkus 与 Kafka 和模式注册表结合使用时,如何覆盖 KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG

我们有一个 Quarkus 项目,它使用 Apache Kafka 以及 Confluence 模式注册表和 Avro。 它使用 Quarkus 代码生成机制从 Avro 模式生成 Java 文件,并且它消耗 fr...

回答 1 投票 0

需要在每个代理和控制器上运行 kafka-storage.sh

基本上就是标题。 多年来我们一直在使用 Zookeeper 运行 kafka。设置是形成 Zookeeper 集群,然后让代理连接到它。一切尽在傀儡之中。没有额外的步骤,经过一些

回答 1 投票 0

有没有办法让服务 A 处理数据消息并将其发布回同一主题,然后服务 B 来使用它?

我们刚刚开始使用 Kafka,我刚刚开始了解 Kafka 的不同功能。 我们有一个要求,通过 Kafka 主题发送的事件需要被一次性消耗掉......

回答 1 投票 0

如何自定义 Spring Cloud Stream Schema 注册表以与无 SQL 后端一起使用

我正在尝试将 Spring Cloud Stream SchemaRegistry 与 MongoDB 作为 SchemaRegistry 后端集成。我只是想知道目前的实施是否可行以及是什么样的

回答 1 投票 0

Kafka - 事件源/管道

我们刚刚开始使用 Kafka,我刚刚开始了解 Kafka 的不同功能。 我们有一个要求,通过 Kafka 主题发送的事件需要被一次性消耗掉......

回答 1 投票 0

向 KSQL 服务器发出 GET 时出错。连接到 ksql 服务器时的路径:/info

我尝试在本地使用ksql 这是我的 docker-compose 文件 卡夫卡: 图片:confluenceinc/cp-kafka:最新 容器名称:mvtds-transformer-kafka 取决于: - 动物园管理员 端口: - &quo...

回答 1 投票 0

克隆 Kafka / ZooKeeper 集群进行测试

我入职了一家新公司,他们只有两个环境,一个是测试环境,一个是生产环境。 在测试环境中,很多开发团队都在使用这个

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.