用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。
Kafka Java Producer API无法将密钥序列化为Long或Int
这是在Kafka中产生数据的Java代码:import org.apache.kafka.clients.producer。*;导入org.apache.kafka.common.serialization.LongSerializer;导入org.apache.kafka.common ....
有4个引导服务器,我正在使用下面的代码producer.send(new ProducerRecord <>(topic,partitionNumber,key,message,headers,(metadata,exception)-> ...
在Confluent kafka中,在.Net的Message Producer中传递对象
我正在使用.net core 3.1,并通过Confluent.Kafka使用融合的kafka lib;我正在实施kafka系统,同时创建生产者和消费者。我知道我可以轻松地做某事...
[通过kafka definitve指南时,我碰到这个短语。当键为null且使用默认分区程序时,记录将发送到...的可用分区之一]]
为什么kafka生产者(性能测试)具有如此低的吞吐量/高延迟?
我是kafka的新手,正在运行一些性能测试。我正在运行一个由两台计算机组成的集群,其中包括我的笔记本电脑和一个覆盆子pi零W(1 GHz,单核CPU,512 MB RAM,802.11n无线局域网)。 ...
我在KAFKA Transactions中产生的数据如下所示:ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timestamp_type = 0,key = None,value = {'transaction_id':. ..
我有2个表,可以说Emp和Courses表。 Emp有3万行,课程有10万行。 1名员工可以有很多课程,即一对多关系。我需要从表中获取记录,然后...
将writeStream放电到kafka-awaitTermination()与awaitAnyTermination()之间的差异
根据官方文档,我使用下面的代码段来编写kafka主题,但未将其写入kafka。 finalStream = final \ .writeStream \ .format(“ kafka”)\ .option(“ ...
如何删除引号并像原始格式一样发送数据原始JSON格式为:{“ @timestamp”:“ 2020-06-02T09:38:03.183186Z”}此数据在另一个主题“ {\” @ timestamp中\“:\” 2020-05 -...
我正在做一个副项目,在该项目中,我将运输数据提取到kafka集群中。数据来自我市的公共API。例如:城市中的每条道路都在工作。我正在抓取道路工程...
我正在做一个副项目,在该项目中,我将运输数据提取到kafka集群中。数据来自我市的公共API。例如:城市中的每条道路都在工作。我正在抓取道路工程...
我正在使用Long序列化程序作为键,并使用String序列化程序作为value,在我们检索消息时将消息发布到kafka主题后,连同键一起将键看作是一些垃圾...
我有一个用例,需要用Kafka Console Producer发送键值消息。那么如何通过Kafka Console Producer命令实现这一目标?
我在GCP中创建了一个虚拟机,并在其中设置了kafka。我已将通告的侦听器更改为虚拟机的外部公共IP。但是,无论何时我尝试从本地笔记本电脑进行连接,我都会不断得到...
我正在使用带有kafka的Spring引导程序来设置我的项目。但是当我运行它时,它将给出org.apache.kafka.common.errors.SerializationException:无法将类java.lang.Integer的键转换为类...
如何配置Kafka键SERIALIZER,对于有时键很长而其他时间键是String的情况?
我有kafka生产者配置,直到这次我将密钥作为String类型发送,并配置了SERIALIZER密钥,如下所示,configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,...
我试图在不传递任何分区值的情况下向Kafka生成记录,但想发送标头,并且我在下面有重写方法来向Kafka生成记录,ProducerRecord(java.lang.String ...
我有以下代码段:groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds))。grace(Duration.ZERO)); KTable mergedTable = ...
系统包含用户实体。每个用户都可以按类型获取消息。如何在卡夫卡组织这件事?我知道,我可以创建主题消息并通过关键的用户ID存储消息,但是如果有百万用户呢? ...
使用发布/订阅如何确认消息?当消息以唯一组的形式发送给某些使用者时。这是否表示所有消费者都确认了消息或所有消息都得到了确认...