Commit 与 offset/message 是否已被应用程序拓扑的整个处理节点集处理,而仅取决于提交间隔,是否正确?换句话说,在典型的 kafka 消费者应用程序中,当消息被完全处理时会提交,而不是仅获取,在 Kafka 流中,简单地获取就足以让提交间隔启动并提交该消息/偏移量?也就是说,即使那个offset/message还没有被应用拓扑的整组处理节点处理过?
或者消息是否有资格提交,基于拓扑的整个处理节点集处理了它们,并且它们准备好在主题或外部系统中出去。从某种意义上说,这个问题可以概括为,偏移量/消息何时有资格在 Kafka 流中提交?是有条件的吗?如果是的话,条件是什么?
Topology
可能包含多个子拓扑(https://docs.confluence.io/current/streams/architecture.html#stream-partitions-and-tasks )。子拓扑通过主题相互连接。 如果一条记录被子拓扑完全处理,则可以提交该记录。对于这种情况,记录的中间输出在提交之前被写入连接两个子拓扑的主题中。下游子拓扑将从“连接主题”中读取并提交该主题的偏移量。
承诺确实仅基于
commit.interval.ms
发生。如果一次获取返回 100 条记录(偏移量 0 到 99),并且当
commit.interval.ms
命中时,子拓扑会处理 30 条记录,Kafka Streams 将首先确保这 30 条消息的输出刷新到 Kafka(即,
Producer.flush()
),然后提交偏移量
30
——其他 70 条消息位于 Kafka Streams 的内部缓冲区中,并将在提交后进行处理。如果缓冲区为空,则将发送新的提取。每个线程独立跟踪
commit.interval.ms
,并且如果提交间隔过去,将提交其所有任务。因为提交是在子拓扑的基础上进行的,所以可能会提交输入主题记录,而输出主题还没有结果数据,因为中间结果还没有被下游子拓扑处理。
您可以通过
Topology#describe()
检查程序的结构,看看您的程序有哪些子拓扑。
commit.interval.ms
只是定义提交之间的最短时间,即较大的值意味着不会在每次轮询中发生提交。这意味着,只要您不生成额外的线程,您就只会为已完全处理的消息提交偏移量,无论处理涉及什么。