apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

使用TopologyTestDriver和Micronaut对Kafka流进行单元测试。

当我试图运行一个基本的单元测试时,我得到以下错误:java.lang.IllegalArgumentException: Unknown topic: streams-wordcount-output at org.apache.kafka.streams.TopologyTestDriver...。

回答 1 投票 0

无法解析符号 "StreamsBuilder"。

我试图用Java创建一个Kafka Streams应用程序,但在管理依赖关系方面遇到了困难。请允许我说,我对Java和它的工具生态系统完全是个新手。在...

回答 1 投票 1

如何使用自定义的Kafka Streams ProductionExceptionHandler?

我正在开发一个kafka-streams应用程序,但在覆盖默认的ProductionExceptionHandler方面遇到了麻烦。我已经创建了一个实现ProductionExceptionHandler的类。

回答 1 投票 1

KafkaStreams。在KStream-KTable Join中处理Deserialize异常。

假设我们正在做一个KStream和KTable之间的内部连接,如下所示。StreamsBuilder sb = new StreamsBuilder(); JsonSerde sensorMetaDataJsonSerde = new ...

回答 1 投票 0

如何像使用关系型数据库一样使用Kafka?

今天的好时间。我很抱歉我的英语不好。我有一些问题,你能帮助我了解我如何能使用kafka和kafka流像数据库。我的问题是我有一些微服务,每个...

回答 1 投票 -1

减少Spring Cloud Kafka Streams应用中的锅炉模板

在我的Spring Cloud Kafka Streams应用中,我想加入一个KTable 到一个KT表 . 我的应用程序的输入已经使用MessagePack编码,所以我已经为Serdes定义了 ...

回答 1 投票 0

Kafka Streams中流媒体加入的默认WindowBytesStoreSupplier是什么?

新API的签名是:join(KStream)。 otherStream,ValueJoiner super V,? super VO,? extends VR> joiner,JoinWindows windows,StreamJoined。 streamJoined)如果我...

回答 1 投票 1

Kafka流。谁能给我指出一个完全实现的java例子,向DLQ(死信队列)主题发送消息

在我的流应用中,我在另一个系统上有一个外部依赖,我想在我的流应用中发布一条消息到DLQ kafka主题,每当DeserializationProducer或任何外部...

回答 1 投票 0

Kafka流:NUM_STREAM_THREADS_CONFIG > 1是否会破坏分区的总排序?

我们开始吧。我有一个相当复杂的拓扑结构,包括各种连接,聚合,过滤器,地图等等。默认情况下,NUM_STREAM_THREADS_CONFIG参数等于1,这完全是决定性的。

回答 1 投票 1

是否使用Kafka Streams和or KSQL来对数据库中的数据流进行去正常化处理。

在网上做了很多阅读之后,我终于接触到了这个论坛。我的挑战是将数据库中通过CDC来源的事务性数据去正常化到Kafka中,然后再写出来成 ...

回答 1 投票 0

KafkaStreams Scala对2个字段进行分组以获得不同的计数。

在我的Kafka主题0中,我有以下Key-Value数据 -> {"UserId": "123", "DateAndTime": "2020-06-10T10:30:03.000Z"}。0 -> {"UserId": "123", "DateAndTime": "2020-05-10T10:30:03.000Z"}。1 -> {"...

回答 1 投票 -1

npm库kafka-node和kafka-streams之间的区别。

在一个项目中,Node.js应用连接到Kafka消息队列,从队列中获取所有消息。我搜索了一下,发现有两个包kafka-node和kafka-streams。请问哪个包适合这里?

回答 1 投票 0

KT表suppress(suppressed.untilTimeLimit())不包含指定时间的记录。

我已经实现了一个流处理应用程序,它可以进行一些计算和转换,并将结果发送到一个输出主题。之后,我从该主题中读取,我想抑制 ...

回答 1 投票 0

Kafka Streams 2.3到2.5的升级打破了Scala的编译功能

当从KafkaStreams库从2.3升级到2.5,保持相同的Scala版本时,运行时出现以下错误。[info] com.mypackage.dp.streams.applications...。

回答 1 投票 0

升级到kafka-streams:5.5.0-css(Apache Kafka 2.5.0)后,GlobalKTable的getting store崩溃了 [已解决] 。

我有一个使用GlobalKTable的Spring Boot App。直到更新到kafka-streams-5.5.0-css(Confluent Platform版本与Apache Kafka 2.5.0兼容),从5.3.2-css(Apache Kafka 2......)更新到kafka-streams-5.5.0-css之前,它工作得很好。

回答 1 投票 0

使用SendToDlqAndContinue spring kafka stream活页夹的序列化异常

我正在尝试在处理异常时将消息发送到DLQ,但是当我从Spring-boot-kafka-streams-binder @EnableBinding(...]使用SendToDlqAndContinue时,我不断收到序列化异常,这是由于<>

回答 1 投票 0

Kafka POM依赖关系问题-ClassNotFoundException:org.apache.kafka.test.TestCondition

我正在查看-java.lang.NoClassDefFoundError:集成测试中的org / apache / kafka / test / TestCondition。我认为这与存储库导入与kafka相关的软件包的方式有关。我是...

回答 1 投票 0

Kafka:与多个使用者的sendOffsetsToTransaction

对于Kafka项目,我使用消费/加工/生产模型,但有两个消费者。所以我想知道,是否有可能对具有唯一生产者的两个使用者使用sendOffsetsToTransaction()函数?...

回答 1 投票 0

访问Kafka流中聚合器内部的TimeWindow属性

我想在一个时间窗口内使用Kafka-Streams流式传输一个主题的最新记录,并且我希望将输出记录的时间戳设置为等于该记录的时间窗口的结尾...

回答 1 投票 2

Kafka StateStore共享最佳实践

创建处理器API拓扑时,我注意到Topology#addStateStore(StoreBuilder,String ...)接受多个处理器,这意味着一个状态存储可以由多个处理器共享。是...

回答 1 投票 -1

© www.soinside.com 2019 - 2024. All rights reserved.