kafka-producer-api 相关问题

用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。

Kafka Java Producer API无法将密钥序列化为Long或Int

这是在Kafka中产生数据的Java代码:import org.apache.kafka.clients.producer。*;导入org.apache.kafka.common.serialization.LongSerializer;导入org.apache.kafka.common ....

回答 1 投票 0

向kafka发送的消息,但消费者未收到其中的一些消息

有4个引导服务器,我正在使用下面的代码producer.send(new ProducerRecord <>(topic,partitionNumber,key,message,headers,(metadata,exception)-> ...

回答 1 投票 0

在Confluent kafka中,在.Net的Message Producer中传递对象

我正在使用.net core 3.1,并通过Confluent.Kafka使用融合的kafka lib;我正在实施kafka系统,同时创建生产者和消费者。我知道我可以轻松地做某事...

回答 1 投票 0

Apache Kafka-没有密钥的消息

[通过kafka definitve指南时,我碰到这个短语。当键为null且使用默认分区程序时,记录将发送到...的可用分区之一]]

回答 2 投票 3

为什么kafka生产者(性能测试)具有如此低的吞吐量/高延迟?

我是kafka的新手,正在运行一些性能测试。我正在运行一个由两台计算机组成的集群,其中包括我的笔记本电脑和一个覆盆子pi零W(1 GHz,单核CPU,512 MB RAM,802.11n无线局域网)。 ...

回答 1 投票 1

如何使用python在kafka使用者中聚合json数据?

我在KAFKA Transactions中产生的数据如下所示:ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timestamp_type = 0,key = None,value = {'transaction_id':. ..

回答 1 投票 0

从数据库中获取大数据集并将其发送到Kafka

我有2个表,可以说Emp和Courses表。 Emp有3万行,课程有10万行。 1名员工可以有很多课程,即一对多关系。我需要从表中获取记录,然后...

回答 2 投票 0

将writeStream放电到kafka-awaitTermination()与awaitAnyTermination()之间的差异

根据官方文档,我使用下面的代码段来编写kafka主题,但未将其写入kafka。 finalStream = final \ .writeStream \ .format(“ kafka”)\ .option(“ ...

回答 1 投票 0

如何在Python中产生JSON格式的Kafka消息

如何删除引号并像原始格式一样发送数据原始JSON格式为:{“ @timestamp”:“ 2020-06-02T09:38:03.183186Z”}此数据在另一个主题“ {\” @ timestamp中\“:\” 2020-05 -...

回答 1 投票 0

设计:将重复的状态发送到Kafka主题中

我正在做一个副项目,在该项目中,我将运输数据提取到kafka集群中。数据来自我市的公共API。例如:城市中的每条道路都在工作。我正在抓取道路工程...

回答 1 投票 0

设计:在主题中发送重复状态

我正在做一个副项目,在该项目中,我将运输数据提取到kafka集群中。数据来自我市的公共API。例如:城市中的每条道路都在工作。我正在抓取道路工程...

回答 1 投票 0

垃圾值作为kafka主题中的键发布

我正在使用Long序列化程序作为键,并使用String序列化程序作为value,在我们检索消息时将消息发布到kafka主题后,连同键一起将键看作是一些垃圾...

回答 1 投票 0

如何通过kafka控制台生产者发送密钥,价值消息

我有一个用例,需要用Kafka Console Producer发送键值消息。那么如何通过Kafka Console Producer命令实现这一目标?

回答 1 投票 0

即使配置了公告的侦听器也无法连接到远程kafka代理

我在GCP中创建了一个虚拟机,并在其中设置了kafka。我已将通告的侦听器更改为虚拟机的外部公共IP。但是,无论何时我尝试从本地笔记本电脑进行连接,我都会不断得到...

回答 1 投票 0

在Spring boot kafka中的ProducerConfigs中将ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG设置为IntegerSerializer时,它给出类强制转换例外

我正在使用带有kafka的Spring引导程序来设置我的项目。但是当我运行它时,它将给出org.apache.kafka.common.errors.SerializationException:无法将类java.lang.Integer的键转换为类...

回答 1 投票 0

如何配置Kafka键SERIALIZER,对于有时键很长而其他时间键是String的情况?

我有kafka生产者配置,直到这次我将密钥作为String类型发送,并配置了SERIALIZER密钥,如下所示,configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,...

回答 1 投票 0

向Kafka生产记录

我试图在不传递任何分区值的情况下向Kafka生成记录,但想发送标头,并且我在下面有重写方法来向Kafka生成记录,ProducerRecord(java.lang.String ...

回答 2 投票 2

Kafka Streams API:会话窗口不兼容的类型

我有以下代码段:groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds))。grace(Duration.ZERO)); KTable mergedTable = ...

回答 1 投票 0

Kafka如何向特定用户发送消息?

系统包含用户实体。每个用户都可以按类型获取消息。如何在卡夫卡组织这件事?我知道,我可以创建主题消息并通过关键的用户ID存储消息,但是如果有百万用户呢? ...

回答 2 投票 0

ack如何在Kafka中用于发布/订阅?

使用发布/订阅如何确认消息?当消息以唯一组的形式发送给某些使用者时。这是否表示所有消费者都确认了消息或所有消息都得到了确认...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.