何时使用Kafka交易API?

问题描述 投票:0回答:1

我试图了解Kafka的事务性API。 This link定义原子的读取过程-写入周期如下:

首先,让我们考虑一下原子级的read-process-write循环的含义。简而言之,这意味着如果应用程序在某个主题分区tp0的偏移量X处消耗了一条消息A,并在对消息A进行了一些处理(例如B = F(A))之后将消息B写到主题分区tp1,则仅当消息A和消息B被视为已成功使用并一起发布,或者根本不发布时,读-写-写周期才是原子的。

进一步说以下内容:

使用配置为至少一次传递语义的香草Kafka生产者和消费者,一旦以以下方式处理语义,流处理应用程序可能会完全丢失:

  1. producer.send()可能由于内部重试而导致重复写入消息B。这由幂等的生产者解决,而不是本文其余部分的重点。

  2. 我们可能会重新处理输入消息A,导致重复的B消息被写入输出,这完全违反了一次处理语义。如果流处理应用程序在写入B之后但将A标记为已使用之前崩溃,则可能会发生重新处理。因此,当恢复时,它将再次消耗A并再次写入B,从而导致重复。

  3. 最后,在分布式环境中,应用程序将崩溃或(更糟糕!)暂时失去与系统其余部分的连接。通常,新实例会自动启动以替换被认为丢失的实例。通过此过程,我们可能有多个实例处理相同的输入主题并写入相同的输出主题,从而导致输出重复,并且违反了一次处理语义的方式。我们称此为“僵尸实例”的问题。

我们在Kafka中设计了交易API,以解决第二和第三个问题。通过使这些循环成为原子的并促进僵尸防护,事务可以在读写过程中实现一次精确的处理。

问题:

  1. 上面的第2点和第3点描述了何时可以发生使用事务性API处理的消息重复。事务性API是否还有助于避免任何情况下的消息丢失?

  2. Kafka事务性API的大多数在线示例(例如herehere)涉及:

    while (true)
    {
        ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
        producer.beginTransaction();
        for (ConsumerRecord record : records)
          producer.send(producerRecord(“outputTopic”, record));
        producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
        producer.commitTransaction();
    }
    

    这基本上是读-写-写循环。那么,事务性API仅在读-写-写循环中有用吗?

  3. [This文章给出了非读写过程中的事务性API的示例:

     producer.initTransactions();
     try {
        producer.beginTransaction();
        producer.send(record1);
        producer.send(record2);
        producer.commitTransaction();
     } catch(ProducerFencedException e) {
       producer.close();
     } catch(KafkaException e) {
       producer.abortTransaction();
     } 
    

    它说:

    这允许生产者将一批消息发送到多个分区,以使该批中的所有消息最终对任何消费者都是可见的,或者对于消费者而言是不可见的。

    此示例是否正确,并显示了使用事务性API的另一种方法,它不同于read-process-write循环? (请注意,它也不会将偏移量提交给事务。)

  4. 在我的应用程序中,我只是使用来自kafka的消息,进行处理并将它们记录到数据库中。那是我的全部流程。

    a。因此,我想这不是read-process-write cycle。 Kafka事务性API是否对我的情况有用?

    b。另外,我需要确保每个消息都只处理一次。我想在生产者中设置idempotent=true就足够了,我不需要事务性的API,对吗?

    c。我可能会运行多个管道实例,但是我没有将处理输出写入Kafka。因此,我猜这将永远不会涉及僵尸(重复的制片人写信给kafka)。因此,我猜想事务性API不会帮助我避免重复处理情况,对吗? (我可能必须在同一数据库事务中同时保留偏移量和处理输出到数据库的信息,并在生产者重新启动期间读取偏移量,以避免重复处理。)

apache-kafka kafka-producer-api kafka-transactions-api
1个回答
0
投票
a。因此,我想这不是读过程写周期。是卡夫卡对我的情况有用的事务性API?
这是一个读过程写操作,除了您要写数据库而不是Kafka。 Kafka拥有自己的事务管理器,因此假设您可以正确恢复使用消费者编写的处理器的状态,那么在等幂的事务内部进行写入将只启用一次处理。您不能对数据库执行此操作,因为数据库的事务管理器不会与Kafka的数据库同步。相反,您可以做的是确保即使kafka事务对于数据库而言不是原子的,它们最终仍将保持一致。

假设您的使用者读取,写入数据库,然后确认。如果数据库失败,则您不会确认,并且可以根据偏移量正常恢复。如果确认失败,则将处理两次并保存到数据库两次。如果您可以使此操作成为幂等,那么您是安全的。这意味着您的处理器必须是纯处理器,并且数据库必须进行重复数据删除:两次处理同一条消息应始终在DB上导致相同的结果。

b。另外,我需要确保每个消息都只处理一次。我想在生产者中设置idempotent = true就足够了,我没有需要交易API,对吧?

假设您尊重点a的要求,那么在不同存储上进行持久化处理之后,恰好也需要在您的初始写入和重复存储之间没有对要保存的对象进行任何其他更改。假设将值写为X,然后其他角色将其更改为Y,然后重新处理消息并将其更改回X。例如,可以通过将数据库表设置为日志来避免这种情况,类似于kafka主题。 。

c。我可能会运行多个管道实例,但是我没有将处理输出写入Kafka。因此,我猜这将永远不会涉及僵尸(重复的制片人写信给kafka)。因此,我猜想事务性API不会帮助我避免重复处理情况,对吗? (我可能必须在同一数据库事务中同时保留偏移量和处理输出到数据库的内容,并在生产者重新启动期间读取偏移量,以避免重复处理。)

这是生产者写的您所消费的主题,它可能会创建僵尸消息。该制作人需要与kafka融洽相处,以便忽略僵尸。事务性API与您的使用者一起将确保此生产者以原子方式编写并且您的使用者以非原子方式读取已提交的消息。如果只想一次幂等就足够了。如果消息是原子编写的,那么您也需要进行事务处理。无论哪种方式,您的读写/消耗生产处理器都必须是纯净的,并且必须进行重复数据删除。您的数据库也是该处理器的一部分,因为该数据库实际上是持久存在的数据库。

我在互联网上找了一些,也许此链接对您有所帮助:processing guarantees

您发布的链接:exactly once semanticstransactions in kafka很棒。

© www.soinside.com 2019 - 2024. All rights reserved.