免责声明:这是我第一次使用 AWS Kinesis,因此我的预期可能不正确。
我有一个非常简单的 AWS Lambda 函数,可以将数据插入到 Kinesis 中。 API响应表明没有错误,也没有抛出异常; Kinesis 只是确认一切顺利。然而,当我进入AWS控制台并尝试查询数据时,什么也没有!
const records = [
{ partitionKey: '1', data: 'Record 1' },
{ partitionKey: '2', data: 'Record 2' },
{ partitionKey: '3', data: 'Record 3' },
]
const params = {
Records: records.map((record) => ({
Data: record.data,
PartitionKey: record.partitionKey,
})),
StreamName: streamName,
}
const response = await kinesis.putRecords(params).promise()
const suscess = response.FailedRecordCount === 0
suscess
是真的。
因此,我尝试使用同一 Lambda 函数中的代码检索数据。将数据插入 Kinesis 后,我添加了以下代码:
const params2 = {
ShardIteratorType: 'LATEST',
ShardId: 'shardId-000000000000',
StreamName: streamName,
}
const response2 = await kinesis.getShardIterator(params2).promise()
const shardIterator = response2.ShardIterator
const records2 = await kinesis.getRecords({ ShardIterator: shardIterator! }).promise()
records2
是一个空数组。
令我惊讶的是,没有返回任何记录;响应为空,没有检索到任何有用的信息。此外,没有引发任何异常。
我检查了shardId,它确实存在。
所以我的问题是,我做错了什么?
为什么 AWS Kinesis 显示已插入数据,但似乎没有任何反应?在 AWS 控制台中,我可以从使用图表中看到 put 和 get 操作中的活动。
由于您最近开始使用 Kinesis Data Streams,我建议您阅读架构和术语文档。它将让您深入了解数据如何在系统内部传输。
根据 AWS 文档:
Kinesis 数据流是一组分片。每个分片都有一系列数据记录。每个数据记录都有一个由 Kinesis Data Streams 分配的序列号。
基于此定义,Kinesis 数据流在内部使用分片来传输数据。如果您使用“按需”容量模式,它将最初分配至少 4 个分片,并根据应用程序的吞吐量增加分片数量。但是,如果您使用 Provisioned 模式创建 Kinesis Data Streams,则必须至少使用 2 个分片。因此,Kinesis 数据流始终有多个分片来处理数据流。 关于您在消费者端使用的代码,您将分片 ID 值显式硬编码为
shardId-000000000000
,但您无法确定您发布的数据是否已发送到该特定分片 ID。因此,除了检查发布者端的
FailedRecords
计数之外,还记录 putRecords
方法的完整响应,它将向您显示哪个 shardId
用于传输数据。为了简化操作,我使用 AWS CLI 命令来发布数据,您可以轻松地解释/转换为 Node.js 以实现相同的结果。
放置记录命令:
aws kinesis put-records \
--stream-name <ENTER_STREAM_NAME_HERE> \
--records Data="Record 1",PartitionKey="1" Data="Record 2",PartitionKey="2" Data="Record 3",PartitionKey="3" --cli-binary-format raw-in-base64-out
PutRecords 响应:
{
"FailedRecordCount": 0,
"Records": [
{
"SequenceNumber": "49643231014796346134140772956682328171122645057423802450",
"ShardId": "shardId-000000000005"
},
{
"SequenceNumber": "49643231014796346134140772956683537096942259686598508626",
"ShardId": "shardId-000000000005"
},
{
"SequenceNumber": "49643231014796346134140772956684746022761874315773214802",
"ShardId": "shardId-000000000005"
}
]
}
如果您观察上面的响应,当我执行
put-records
命令时,数据被发送到
shardId-000000000005
。假设我之前没有消耗过该分片中的任何记录,因此我将使用 TRIM_HORIZON
作为
ShardIteratorType
来获取数据指针值。GetShardIterator 命令:
aws kinesis get-shard-iterator \
--stream-name <ENTER_STREAM_NAME_HERE> \
--shard-id shardId-000000000005 \
--shard-iterator-type TRIM_HORIZON
GetShardIterator 响应:
{
"ShardIterator": "AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ=="
}
上述响应包含从分片读取最旧(未修剪)数据记录的指针记录:
shardId-000000000005
。一旦我们有了迭代器值,我们就可以使用
get-records
方法来获取记录。获取记录命令:
aws kinesis get-records \
--shard-iterator AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ==
获取记录响应:
{
"Records": [
{
"SequenceNumber": "49643231014796346134140772956682328171122645057423802450",
"ApproximateArrivalTimestamp": "2023-08-02T22:15:34.035000+00:00",
"Data": "UmVjb3JkIDE=",
"PartitionKey": "1"
},
{
"SequenceNumber": "49643231014796346134140772956683537096942259686598508626",
"ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00",
"Data": "UmVjb3JkIDI=",
"PartitionKey": "2"
},
{
"SequenceNumber": "49643231014796346134140772956684746022761874315773214802",
"ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00",
"Data": "UmVjb3JkIDM=",
"PartitionKey": "3"
}
],
"NextShardIterator": "AAAAAAAAAAHIu30Hail1drAR8L9vok/zazMmRawSMqVACRymRKho+06rk6PHZ0G9JbYJLzIjUoo3UVT3XiqcfTL/QO6Dt1SJhY7p2P50V8Dhv2pkGavpNnh43114Mp4i3HAUSsYkwNRW8EJSIcJ/LZysNG1z0KLmbBp+Vau5UOj9mbZu4aU7H+97WqJkoHvK8/BC2AcMnVUlR03/xVHS8zy9fer8v6bRCjDgJMCU9CHyJamX5Douqg==",
"MillisBehindLatest": 0
}
在代码中,您将
ShardIteratorType
值用作
LATEST
,这会在分片中最后发布的记录之后创建一个指针。因此,如果您使用 LATEST
迭代器类型,请确保在发布数据之前先获取迭代器值。您还可以考虑使用其他迭代器类型,如 this文档中所述。 我认为您现在已经确定了代码中的问题,可能存在于两个地方:
shardId
。shardId-0000000000000
发送消息,您也无法检索之前发布的值。