Kafka 的所有示例 | 生产者显示ProducerRecord
的键/值对不仅是相同的类型(所有示例都显示
<String,String>
),而且是相同的值。例如:
producer.send(new ProducerRecord<String, String>("someTopic", Integer.toString(i), Integer.toString(i)));
但是在 Kafka 文档中,我似乎找不到解释键/值概念(及其潜在目的/效用)的地方。在传统消息传递(ActiveMQ、RabbitMQ 等)中,我总是在特定主题/队列/交换处触发消息。但 Kafka 是第一个似乎需要键/值对而不仅仅是常规字符串消息的代理。
所以我问:
要求生产者发送KV对的目的/用处是什么?
partitions 组成的分布式 log 的抽象。将日志拆分为多个分区可以横向扩展系统。
Keys 用于确定日志中附加消息的分区。而该值是消息的实际负载。在这方面,这些例子实际上并不是很“好”;通常你会有一个复杂的类型作为值(比如元组类型或 JSON 或类似的),并且你会提取一个字段作为键。
请参阅:http://kafka.apache.org/intro#intro_topics 和 http://kafka.apache.org/intro#intro_ Producers
一般来说,键和/或值也可以是null
。如果键是
null
,则会选择一个随机分区。如果值为
null
,则它can 具有特殊的“删除”语义,以防您为某个主题启用日志压缩而不是日志保留策略 (http://kafka.apache.org/documentation#compaction) .
如果没有密钥,同一密钥上的两条消息可能会发送到不同的分区,并由组中的不同消费者乱序处理。
https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
您可以在此处阅读有关序列化器/反序列化器的信息:
https://www.baeldung.com/kafka-custom-serializer
您需要了解的主要事情:这个序列化器/反序列化器与 JSON、AVRO 或 Protobuf 无关。不一样的“连载”
另一个有趣的用例
我们可以使用 Kafka 主题中的 key 属性来发送 user_ids,然后插入消费者来获取流事件(存储在值属性中的事件)。这可以让您处理用户事件序列的任何最大历史记录,以便在机器学习模型中创建功能。我仍然需要弄清楚这是否可能。将继续更新我的答案并提供更多详细信息。