与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
您好我正在使用kafka会话窗口,非活动时间为5分钟。当达到非活动时间并且会话根据密钥下降时,我想要某种反馈。我假设我有(A,1)......
我们正在使用kafka streams的“windows join”加入2个流,我们想知道:为什么KS会在内部主题上添加+ 24小时?例如,我们有一个1小时的窗口,但内部主题有...
我一直在调查Kafka Streams应用程序上的磁盘写入,我将拓扑结构减少到最低限度,即:KStream stream = builder.stream(“input-topic”); ...
这是我目前遇到的一个问题。最近我一直在探索用于绘图的散景和用于流式传输的kafka。我想到使用它们制作样本实时仪表板。但是......
我正在编写一个应用程序,我正在尝试计算每小时访问一页的用户数。我正在尝试过滤到特定事件,按userId和事件小时时间分组,然后按...分组
Kafka Streams应用程序在kafka服务器上打开了太多文件
我一直在研究基于java kafka-streams API的应用程序,其目标是处理来自一个kafka主题的数据流,并将其生成另一个主题。看来,......
KSQL / Kafka Streams可以支持复杂事件处理吗?
我想对kafka中的事件进行用户行为分析。 KSQL / Kafka Streams现在可以支持复杂事件处理了吗?
KafkaStreams如何确定GlobalKTable是否在引导时完全填充?
我用来创建GlobalKTable的主题非常活跃。在KStream-GlobalKTable连接的文档中,我读到了GlobalKTable在(重新)启动KafkaStreams实例时完全自举,...
我知道使用Kafka使用者的api我们可以获得与特定时间戳(getOffsetsByTimes())相对应的偏移量。我们如何获得偏移量并从一个点开始重放流...
我正在Kafka Streams上进行POC,我正在使用Kafka Streams处理avro消息。事实是我的Avro消息混合了简单和复杂的类型,所以我发现很难......
这个错误是什么意思?无法在kafka代理中为流应用程序分区添加错误?每当我启动我的kafka流应用程序时,我都会看到太多这些错误。什么 ...
如何使用kafka和faust检查在给定时间段内是否已发送新记录
我正在使用包括汇合平台(docker)的测试设置,并使用以下信息处理记录:传感器ID,时间戳,值。使用robinhood的faust(类似于Kafka Streams ......
我正在使用spark从主题kafka获取数据。我必须使用KafkaAvroDeserialaizer对avro数据进行deserialaizer。我配置kafka使用者:kafkaParams.put(“bootstrap.servers”,“10.0.4.215:9092”); ...
Kafka Streams:topic.compression.type不是已知的配置
在Kafka Streams中添加压缩配置,与此链接类似:properties.put(StreamsConfig.topicPrefix(TopicConfig.COMPRESSION_TYPE_CONFIG),“snappy”);但我看到以下......
事件采购 - Apache Kafka + Kafka Streams - 如何确保原子性/交易性
我正在使用Apache Kafka Streams评估事件采购,以了解复杂场景的可行性。与关系数据库一样,我遇到过一些案例,原子性/事务性是......
使用STATE_CLEANUP_DELAY_MS_CONFIG
在StreamsConfig中有一个参数STATE_CLEANUP_DELAY_MS_CONFIG,它表示“在迁移分区时删除状态之前等待的时间量(以毫秒为单位)。只有状态目录......