给定两个实体
Agency
和 Configuration
:
class Agency {
Long id; // PK
UUID configurationId; // FK -> Configuration.id
// ...
}
class Configuration {
UUID id; // PK
// ...
}
各个主题分布在多个分区中。
现在假设以下拓扑定义从
Agency.configurationId
到 Configuration.id
的 KTable-KTable FK 左连接:
@Produces
public Topology buildTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
try (
final Serde<Long> longKeySerde = AggregatorSerdes.debeziumKeySerdeFromFieldId(Long.class); // This is basically a JSON Serde reading from a specific property
final Serde<UUID> uuidKeySerde = AggregatorSerdes.debeziumKeySerdeFromFieldId(UUID.class); // Same here, but for UUID
final Serde<Agency> agencySerde = AggregatorSerdes.debeziumValueSerdeFromFieldAfter(Agency.class); //
final Serde<Configuration> configurationSerde = AggregatorSerdes.debeziumValueSerdeFromFieldAfter(
Configuration.class); //
final Serde<AggregateAgency> aggregateAgencySerde = AggregatorSerdes.debeziumSerdeWithoutConfiguration(AggregateAgency.class) // This is a plain JSON Serde
) {
final KTable<Long, Configuration> configurations = streamsBuilder
.table( //
configurationTopicName, //
Consumed.with( //
longKeySerde, //
configurationSerde));
streamsBuilder //
.table( //
agencyTopicName, //
Consumed.with( //
longKeySerde, //
agencySerde)) //
.leftJoin( //
configurations, //
Agency::configurationId, //
AggregateAgency::new, //
TableJoined.as("agency-to-configuration")) //
.toStream(Named.as("agency-to-configuration-tostream")) //
.to(aggregateTopic, //
Produced.<Long, AggregateAgency>as("agency-to-configuration-producer") //
.withKeySerde(longKeySerde) //
.withValueSerde(aggregateAgencySerde));
}
return streamsBuilder.build(properties); // Defines topology.optimizations=all
}
此拓扑不会经常呈现预期结果,即使源主题中存在所引用的
Configuration
并不存在于相应的聚合对象中。这是源主题分区的结果,还是 FK 连接的误解/错误实现?
其他观察:
根据 Kafka UI,ID 为 0 的示例
Agency
位于分区 4 中,而其引用的 Configuration
位于分区 8 中。FK 连接的重新分区机制应将 Agency
与新的组合放在一起分区 8 的密钥也是为了允许连接操作。然而,在重新分区主题中查找 FK 显示它位于分区 0 中,从而有效地排除了连接。
这是否是基于 Kafka Streams 使用的分区路由机制与输入主题的初始生产者的分区路由机制的差异?这些是使用 Debezium 创建的。
不确定我是否完全理解你的意思
FK 连接的重新分区机制应该将该代理与新的组合键共同定位到分区 8,以便允许连接操作。
您指的是输入主题吗?输入主题不会被共同分区。 Agency-0 是该机构的 PK,如果该机构与相应的配置相比处于不同的输入主题分区中,那完全没问题。
FK 连接是使用内部“订阅”和“响应”主题实现的。 “订阅”主题将与右侧(配置)表主题共同分区,并且配置 ID 将用于写入该主题。因此,具有配置 8 的 Agency-0(位于输入主题分区 0 中)应写入“订阅”分区 8,以与右表中的配置 8 位于同一位置。 -- 如果找到连接匹配,则通过“响应”主题将“响应”发送回左侧,该主题与左输入表(机构)共同分区,因此我们将编写一个连接匹配进入“响应”分区-0,将其映射回原始输入机构。
参见https://www.confluence.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/了解更多详情。
对于内部订阅和响应主题的哈希,Kafka Streams 使用默认的 murmur2 哈希。您的输入数据是如何分区的?如果分区方式不同,您需要在连接中提供自定义分区器(从 3.1 版本开始支持,通过将TableJoined
对象传递到
join(...)
中)。另一个已知问题是架构注册表密钥格式的使用。连接是根据序列化字节计算的,因此,如果 schema-id(序列化键的一部分)不匹配,即使实际数据相同,连接也可能会失败。