apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

在Kafka流中是否有任何冷启动持久存储的选项?

我一直在kafka-streams工作几个月。我们使用RocksDB来存储数据。现在,changelog主题只保留数天的数据,如果我们的应用程序的持久存储有数据......

回答 1 投票 0

Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

如何在Kafka Streams API中使用带有多个约束的.groupby。与Java 8 Streams API示例相同,public void twoLevelGrouping(List 人的){final Map

回答 1 投票 6

在java中读取json列表

以前我用以下格式读取json数据:JSON {“CreationTime”:“2018-01-12T12:32:31”,“Id”:“08f81fd7-21f1-48ba-a991-08d559b88cc5”,“Operation”:“ AddedToGroup“,”......

回答 3 投票 0

Kafka流:将值连接到数组中

我有一个kafka流,它完成了KTable的缺失值(leftjoin完美地做到了这一点)。但有时,我必须将每个值连接成一个数组,我不知道该怎么做...

回答 1 投票 2

kafka Streams会话窗口

您好我正在使用kafka会话窗口,非活动时间为5分钟。当达到非活动时间并且会话根据密钥下降时,我想要某种反馈。我假设我有(A,1)......

回答 1 投票 1

Kafka Streams窗口加入了保留

我们正在使用kafka streams的“windows join”加入2个流,我们想知道:为什么KS会在内部主题上添加+ 24小时?例如,我们有一个1小时的窗口,但内部主题有...

回答 1 投票 3

KafkaStreams在读取主题时写入磁盘

我一直在调查Kafka Streams应用程序上的磁盘写入,我将拓扑结构减少到最低限度,即:KStream stream = builder.stream(“input-topic”); ...

回答 1 投票 3

使用散景绘图与kafka流媒体

这是我目前遇到的一个问题。最近我一直在探索用于绘图的散景和用于流式传输的kafka。我想到使用它们制作样本实时仪表板。但是......

回答 1 投票 0

Apache Kafka分组两次

我正在编写一个应用程序,我正在尝试计算每小时访问一页的用户数。我正在尝试过滤到特定事件,按userId和事件小时时间分组,然后按...分组

回答 1 投票 0

Kafka Streams应用程序在kafka服务器上打开了太多文件

我一直在研究基于java kafka-streams API的应用程序,其目标是处理来自一个kafka主题的数据流,并将其生成另一个主题。看来,......

回答 1 投票 1

KSQL / Kafka Streams可以支持复杂事件处理吗?

我想对kafka中的事件进行用户行为分析。 KSQL / Kafka Streams现在可以支持复杂事件处理了吗?

回答 1 投票 1

KafkaStreams如何确定GlobalKTable是否在引导时完全填充?

我用来创建GlobalKTable的主题非常活跃。在KStream-GlobalKTable连接的文档中,我读到了GlobalKTable在(重新)启动KafkaStreams实例时完全自举,...

回答 1 投票 1

KSQL / KStream - 根据生成时间获取偏移量

我知道使用Kafka使用者的api我们可以获得与特定时间戳(getOffsetsByTimes())相对应的偏移量。我们如何获得偏移量并从一个点开始重放流...

回答 1 投票 0

使用Kafka Streams处理复杂的Avro消息

我正在Kafka Streams上进行POC,我正在使用Kafka Streams处理avro消息。事实是我的Avro消息混合了简单和复杂的类型,所以我发现很难......

回答 1 投票 1

什么是Kafka经纪人日志中此错误的含义?

这个错误是什么意思?无法在kafka代理中为流应用程序分区添加错误?每当我启动我的kafka流应用程序时,我都会看到太多这些错误。什么 ...

回答 2 投票 1

如何使用kafka和faust检查在给定时间段内是否已发送新记录

我正在使用包括汇合平台(docker)的测试设置,并使用以下信息处理记录:传感器ID,时间戳,值。使用robinhood的faust(类似于Kafka Streams ......

回答 1 投票 2

KafkaAvroDeserializer的例外情况

我正在使用spark从主题kafka获取数据。我必须使用KafkaAvroDeserialaizer对avro数据进行deserialaizer。我配置kafka使用者:kafkaParams.put(“bootstrap.servers”,“10.0.4.215:9092”); ...

回答 2 投票 1

Kafka Streams:topic.compression.type不是已知的配置

在Kafka Streams中添加压缩配置,与此链接类似:properties.put(StreamsConfig.topicPrefix(TopicConfig.COMPRESSION_TYPE_CONFIG),“snappy”);但我看到以下......

回答 1 投票 1

事件采购 - Apache Kafka + Kafka Streams - 如何确保原子性/交易性

我正在使用Apache Kafka Streams评估事件采购,以了解复杂场景的可行性。与关系数据库一样,我遇到过一些案例,原子性/事务性是......

回答 2 投票 1

使用STATE_CLEANUP_DELAY_MS_CONFIG

在StreamsConfig中有一个参数STATE_CLEANUP_DELAY_MS_CONFIG,它表示“在迁移分区时删除状态之前等待的时间量(以毫秒为单位)。只有状态目录......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.