Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。
我在《Mastering Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是什么“压缩主题”和“
我正在尝试调试我们的生产 Kafka Streams 应用程序中的问题。 (简化的)拓扑看起来像这样 builder.stream("输入").groupByKey().reduce( (agg, val) -> &quo...
Apache-Kafka 中 AsyncKafkaConsumer 和 ClassicKafkaConsumer 的主要区别是什么
Kafka API 为 Kafka 消费者提供功能,允许应用程序读取和处理来自 Kafka 主题的消息,并用于需要消费消息的应用程序。那...
我有一台带有 mysql DB(未容器化)的 EC2 机器。现在我想将 CDC 事件流式传输到 Confluence 管理的 Kafka。我正在使用这个 docker compose 文件在同一个主机上启动连接平台...
能否使用confluence-kafka-go异步有序、无损地生成Kafka消息? [已关闭]
我还在 confluenceinc/confluence-kafka-go 上发布了类似的问题。 我正在使用 confluence-kafka-go 开发一项服务,该服务需要 Kafka 生产者支持以下功能: 在...
是否有一种有效的方法来生成 Kafka 消息,同时严格保留消息的顺序并确保使用 confluence-kafka-go 不会丢失消息?
我还在 confluenceinc/confluence-kafka-go 上发布了类似的问题。 我正在使用 confluence-kafka-go 开发一项服务,该服务需要 Kafka 生产者支持以下功能: 在...
有两个kafka主题 消息 图片 新闻主题中的消息可以包含图像 ID 列表,如下所示 { “id”:“新闻-1”, "title": "标题新闻-1", ...
我正在尝试实现 Kafka 生产者/消费者模型,并且正在考虑是否为每个主题创建单独的发布者线程优于让单个发布者处理多个
我在 Kafka 流应用程序中编写了以下代码: KGroupedStream groupedStream = Stream.groupByKey(); groupedStream.windowedBy( SessionWindows.with(Duration.ofSeconds(3))....
KAFKA 与 docker :集群离线,但容器已根据“to docker ps -a”启动
我是卡夫卡新手,我正在尝试做一些简单的事情。 我用docker运行kafka。 这是我的 docker-compose: 版本:“3.2” 服务: 动物园管理员: 图片:wurstmeister/zookeeper p...
将 AWS 上的 Apache kafka 与 GCP 上的 Spark 连接
我已在 GCP 上设置了一个 Dataproc 集群来运行 Spark 作业,并且 Spark 作业驻留在我已配置的 GCS 存储桶上。另外,我通过设置 MSK 集群在 AWS 上设置了 kafka...
我正在使用 Kafka 和 Spring-boot: 卡夫卡生产者类: @服务 公共类 MyKafkaProducer { @Autowired 私人 KafkaTemplate kafkaTemplate; 私人静态
我有格式良好的 JSON 数据,该数据保存在 Oracle 11g 表的 CLOB 字段中。 创建表 INTERFACE_KAFKA_QUEUE ( AUDIT_NUMBER 号, 消息块, DATE_INSERTED ...
Kafka JDBC 连接器 - 在 CLOB 字段中提取 JSON 并将其持久化到 Kafka 主题
我有格式良好的 JSON 数据,该数据保存在 Oracle 11g 表的 CLOB 字段中。 创建表 INTERFACE_KAFKA_QUEUE ( AUDIT_NUMBER 号, 消息块, DATE_INSERTED ...
我正在尝试设置一个Kafka集群(实际上是集群中的第一个节点)。 我有一个单节点 Zookeeper 集群设置。 我正在一个单独的节点上设置 kafka。 两者都运行 CentOS 6.4,ru...
如何将凭证从kafka和数据库传递到FlinkSessionJob
我在k8s中部署了一个flink操作器版本1.10,flink本身(FlinkDeployment)被部署为集群会话,我想部署几个不同的FlinkSessionJobs,并且对于每个我需要定义...
Neo4j 似乎围绕 cypher 命令包装了 CALL 查询
我第一次在独立机器上使用 Kafka Connect,本地托管的 kafka 代理和 Neo4j 数据库在同一台机器上运行。我正在尝试使用自定义 cypher q 将数据提取到 neo4j 中...
我想使用消息 KEY 或 VALUE 查找已发布的消息。如何搜索并打印该消息? 我已经尝试过以下一项,但没有任何效果: def filter_messages(自我, 主题,
我想使用消息的键或值在汇合的kafka主题中搜索消息,然后想使用python打印消息
我正在oracle数据库中执行更新事务,一旦更新完成,汇合的kafka消息应该在相应的主题中发布。 现在我想使用...来查找已发布的消息
当Kafka主题保留时间大于log.roll时间时会发生什么?
我只想更改一个 Kafka 主题的保留时间。我很好奇如果保留时间设置大于代理的 log.roll.ms 设置,如何处理删除。我用...