用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。
在恢复过程中,全局状态存储将从源主题中转储数据(这被认为是全局存储的变更日志主题)。对于删除记录,我做了如下的操作 kvStore.put("key-...
Kakfa消费者在我自己的偏移ID上的提交没有工作 - commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
我正在使用poll()从一个主题中获取一堆消息(比如100条),我在配置中设置了auto-commit为false,max.poll.records为100。我在配置中设置了auto-commit为false,max.poll.records为100。我将从100条消息中消耗10条消息 ...
在Alpakka中使用Transactional.Sink无法向Kafka主题生成消息,但我看到idempotent producer被启用了。
你好,我想使用Alpakka文档中的Producer api。使用Transactional source可以消费记录,制作人也创建了,但是不能把消息放到主题中。
我是Kafa和数据摄取的新手。我知道Kafka是容错的,因为它把数据冗余地保存在多个节点上。但是,我不明白的是,我们如何才能实现容错,在 ...
多个Kafka生产者对同一主题进行编写--如何平衡负载消耗?
所以我有一个设计,我有多个生产者P1,P2,P3,P4...。PN写到一个主题T1,有32个分区。在另一边,我有多达32个消费者在一个消费者组。...
producer-1] WARN o.apache.kafka.clients.NetworkClient: brokers may not be available
kafka-producer-network-thread)。
我有一个大的csv,我想写到一个kafka主题。 def producer(): producer = KafkaProducer(bootstrap_servers='mykafka-broker') with open('homeantonisrepostestfile.csv') as file: ...
Kafka streams Exception: org.apache.kafka.streams.errors.StreamsException - Deserialization异常处理程序。
我正试图用kafka流实现一个简单的计数器。它接受一个键值,如果相同的键值来了,它就会添加新的值。这是我目前写的代码 package exercises;import org.apache...。
spark writeStream into kafka - awaitTermination()与 awaitAnyTermination()之间的区别。
根据官方文档,我使用下面的代码段写入kafka主题,但它没有写入kafka。 finalStream = final.writeStream \ .format("kafka") \ .option("kafka......")。
如何在可配置的时间轴中以JSON格式获取ActiveInactive主题。
谁能解释一下,或者提供一些有用的链接,在Kafka中用Java获取activeinactive主题?
我在一个kafka主题里有一个kafka消息。这个消息的一个键key=ID,这个键的值是value=12345678910111213141。这个值的类型是整数。我想把类型转换为......
Spark Kafka Producer抛出太多打开的文件 Exception
我正在尝试运行一个用Java编写的Spark Kafka Job,以产生大约10K记录,每批到一个Kafka Topic。这是一个Spark批处理作业,它读取100个(共100万条记录)hdfs部分文件... ...
KafkaTimeoutError: 60.0秒后更新元数据失败。
我有一个高吞吐量kafka生产者的用例,我想每秒钟推送成千上万的json消息。我有一个3个节点的kafka集群,我使用最新的kafka-python库,并且有 ...
RD_KAFKA_PARTITION_UA在librdkafka中如何工作?
我在php中有一个生产者,它可以发布到队列中。我使用了下面的库:https:/github.comarnaud-lbphp-rdkafka下面是代码。$conf->set('log_level', ...
我在java上创建了一个kafka producer,工作正常,现在我想把产生的数据存储在mySQL数据库中,但我不知道怎么做。我试过这段代码,但它不工作 try { String ...
使用Kafka-Python将CSV数据发送到Kafka主题。消费者成功发送和接收数据。现在,我试图连续流式传输一个csv文件,添加到该文件的任何新条目都应...
Kafka:与多个使用者的sendOffsetsToTransaction
对于Kafka项目,我使用消费/加工/生产模型,但有两个消费者。所以我想知道,是否有可能对具有唯一生产者的两个使用者使用sendOffsetsToTransaction()函数?...
设置一个要在其上添加kafka侦听器的网络套接字,我们基本上有100个主题,而同一主题的13个不同部分,例如,“ BOOKX”是一个主题,其中13个章节是……
如何使用Python从不同的服务器连接Kakfa Consumer
我无法从Kafka服务器接收到我的使用者服务器的任何数据。使用kafka-python lib将Kafka Server托管在192.168.1.1中,将consumer托管在192.168.1.2中。以下是我的示例...
我正在探索反应式卡夫卡,只是想确认反应式卡夫卡是否等同于同步生产者。使用同步生产者,我们将获得所有ACK的消息传递保证,并且生产者序列为...