我试图在Kafka(Apache Beam)中完全配置一次语义。以下是我将要介绍的更改:
制片人:
enable.idenpotence
= truetransactional.id
= uniqueTransactionalId消费者:
enable.auto.commit
= false
//将以下内容添加到使用者构建器:.commitOffsetsInFinalize()
.withReadCommitted()
在KafkaIO#write
构建器中添加了以下内容:
.withEOS(numShards, sinkGroupId)
有谁知道应该改变什么才能在Apache Beam KafkaIO中实现一次语义?
上面的配置看起来很好还是我误解了smth?
如果我不使用事务API,是否需要指定transactional.id
属性(因为我在apache beam中没有显式生成器)?
好吧,看起来我终于找到了符合我要求的正确设置。这是我最终得到的:
1)KafkaIO.Read
:
enable.auto.commit = false
更新使用者属性.withReadCommitted()
.commitOffsetsInFinalize()
2)KafkaIO#write
:
.withEOS(numShards, sinkGroupId)
它还将启用幂等性并为生产者设置引擎盖下的transactional.id
。因此,通过这样的设置,我们将在读取时具有至少一次的语义,并且在写入时具有完全一次的语义。