AWS Kinesis 似乎是为与 Lambda 一起使用而设计的,其分片的概念与带宽和延迟相关 - 但我试图从非 Lambda 服务中读取它,这是一个长期运行的进程,最终会调用
get_records()
循环。
但显然需要一些手动工作才能正确读取所有分片,包括处理重新分片和检查点,我认为这些工作记录不足。我的问题是:如何从单个长期运行的 Python boto3 客户端正确读取 Kinesis 流中的所有分片并处理所有特殊步骤?
AWS Kinesis 似乎是为与 Lambda 一起使用而设计的。
Kinesis 比 Lambda 早一年。
我的问题是:如何从单个长期运行的 Python boto3 客户端正确读取 Kinesis 流中的所有分片并处理所有特殊步骤?
简要大纲是这样的:
SequenceNumber
的属性。它是一个可以转换为十进制数的字符串。它保证在单个分片内是唯一的,但不能跨分片。考虑到所有这些限制,您的应用程序应该这样做:
ListShards
获取流中现有分片的列表。
首先调用
GetShardIterator
,然后按顺序循环调用 GetRecords
,从分片中读取记录。每个 GetRecords
的结果是分片的一批消息、下一个迭代器令牌,以及可能的子分片列表,当当前分片耗尽时返回。
GetShardIterator
,请使用 AFTER_SEQUENCE_NUMBER
并提供您上次处理的检查点序列号(见下文)。AT_SEQUENCE_NUMBER
以及 ListShards
返回的开始 id。GetRecords
可能不会返回任何记录,这是正常的。如果是这种情况,您可能需要在下一次通话之前引入延迟。
处理
GetRecords
返回的消息。如果某些消息无法处理,则由您的应用程序决定如何定义重试逻辑。 Kinesis 思想假设消息排序很重要,因此您可以逐条处理消息,即使它们是在一批中返回的。如果多次重试后仍无法处理某条消息,则可能需要放弃该消息并继续前进。
处理消息后,您应该检查您在分片中的位置。这是一种奇特的说法,您应该在某处写下分片 ID 和最后处理的消息的序列号。它可以是数据库、磁盘上的文件、键值存储或类似的东西。如果您的应用程序死机,下次重新启动时,它将仅从最后一个检查点位置开始处理分片。您可能希望更少地执行检查点,而不是每条消息执行一次,只要您可以在应用程序终止并重新启动的罕见情况下进行双重处理即可。
一旦分片耗尽(只有在拆分或合并时才会发生),这意味着您已经处理了它的最后一条记录,您应该将此事实记录在检查点存储中,以便可以将其从处理列表中删除。
只要您介意约束,如何并行化分片之间的负载取决于您的应用程序。如果你有两个分片1和2,合并生成分片3,那么1和2可以并行处理,但是3只能在1和2之后处理。这是这个业务中最难的部分。您可能想从这里获得一些实施它的灵感。
AWS 有一款专门为此设计的产品,称为 KCL(Kinesis 客户端库)。
不幸的是,它严格依赖 DynamoDB 作为检查点存储,并且只能与 Java 代码本机集成(尽管它有多种方法可以通过调用以任何语言编写的子进程来处理消息并通过标准进程流与它们进行通信)。