我有一个用例(场景 1),其中我有一个
lambda
从 kinesis streams
变化中监听 DynamoDB
,如果在处理 kinesis stream
批次中的任何记录时出现错误,我返回sequence number
返回记录以便可以拆分批次(如 here 所述)并且由于我已经定义了 max retries
,在重试用完后失败的记录转到 SQS
.
我们还计划将此 SQS 作为上述相同 lambda 的事件源(场景 2)。现在的问题是,在
KDS
(场景 1)的单个事件源的情况下,我使用了 RequestHandler
并且对于批处理失败,我在 sequence number of failed record
中返回了 StreamEventResponse
。在Scenario 2中,我使用RequestStreamHandler
因为我有多个事件源(KDS和SQS)并且我在sequence number of failed record
中编写如下OutputStream
。然而,看起来 Lambda
并没有将其视为失败并且没有执行任何重试。有没有办法使用 RequestStreamHandler
实现 scenario 2我可以返回
seq number for lambda to retry
?
场景 2,其中 KDS 和 SQS 是同一 lambda 的事件源
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException
{
final byte[] byteArray= IOUtils.toByteArray(input);
String request= new String(byteArray, StandardCharSets.UTF_8);
KinesisEvent kinesisEvent=new ObjectMapper.readValue(request,KinesisEvent.class);
String seqNo= kinesisEvent.getRecords().get(0).getKinesis().getSequenceNumber();
List<StreamEventResponse.BatchItemFailure> batchItemFailures = new ArrayList();
batchItemFailures.add(new StreamEventResponse.BatchItemFailure(seqNo));
byte[] failures = SerializationUtils.serialize(new StreamEventResponse(batchItemFailures));
output.write(failures);
}