Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。
为了实现Kafka消费者对消息的一次性处理,我一次只提交一条消息,如下面的public void commitOneRecordConsumer(long seconds){KafkaConsumer
这个错误是什么意思?无法在kafka代理中为流应用程序分区添加错误?每当我启动我的kafka流应用程序时,我都会看到太多这些错误。什么 ...
Kafka SMT ValueToKey - 如何使用多个值作为关键?
我正在使用Confluent JDBCSourceConnector从Oracle表中读取。我正在尝试使用SMT生成由3个连接字段组成的密钥。 transforms = createKey transforms.createKey.type = ...
我有一个Kafka Connect源和接收器连接器,用于将数据放入Kafka并将其取出。我使用docker-compose运行Kafka和Kafka Connect,它以分布式模式运行connect。 ...
在我的应用程序中,我将使用来自100多个主题的数据。我应该为每个主题创建一个消费者还是从所有主题中创建一个消费者消费者?将创造100个消费者创造100 ...
如何使用kafka和faust检查在给定时间段内是否已发送新记录
我正在使用包括汇合平台(docker)的测试设置,并使用以下信息处理记录:传感器ID,时间戳,值。使用robinhood的faust(类似于Kafka Streams ......
例如,如果我在我的一个KafkaConsumer中有一个长时间运行的进程。 (假设需要1小时才能完成。)如果触发了重新平衡,那么这个消费者的撤销操作会等到这个消费者......
我看到很多关于使用offsetsForTimes的答案。我不能只读取时间戳大于给定时间戳的消息,而不是去offsetsForTimes。像下面的东西......
Kafka Streams:topic.compression.type不是已知的配置
在Kafka Streams中添加压缩配置,与此链接类似:properties.put(StreamsConfig.topicPrefix(TopicConfig.COMPRESSION_TYPE_CONFIG),“snappy”);但我看到以下......
应用程序监听2 kafka主题userevent paymentevent Payload for userevent {“userId”:“Id_223”,“firstname”:“fname_223”,“lastname”:“lname_223”,“phonenumber”:“P98202384_223”,“usertimestamp”:“.. 。
如果配置中未指定源,则会抱怨。根据文件:Kafka频道可用于多种场景:使用Flume源和接收器 - 它提供了可靠的...
我使用基于JBoss的保险库来保护敏感数据,例如数据库凭证。我使用基于Java的HTTP REST客户端来创建分布式Kafka连接器,但最终得到了一个安全性......
我正在使用Kafka Consumer API来构建使用者。消息结构很复杂。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我...
事件采购 - Apache Kafka + Kafka Streams - 如何确保原子性/交易性
我正在使用Apache Kafka Streams评估事件采购,以了解复杂场景的可行性。与关系数据库一样,我遇到过一些案例,原子性/事务性是......
我正在尝试使用Kafka作为消息代理来实现RPC架构。使用Kafka而不是另一个消息代理解决方案的决定由当前上下文决定。实际上 ...
当我看到此错误消息时:错误关闭代理,因为/ tmp / kafka-logs中的所有日志目录都已失败(kafka.log.LogManager)首先想到的是“/ tmp目录可能已清除...