对多个事件源使用 RequestStreamHandler 并在批处理失败时拆分

问题描述 投票:0回答:0

我有一个用例(场景 1),其中我有一个

lambda
kinesis streams
变化中监听
DynamoDB
,如果在处理
kinesis stream
批次中的任何记录时出现错误,我返回
sequence number
返回记录以便可以拆分批次(如 here 所述)并且由于我已经定义了
max retries
,在重试用完后失败的记录转到
SQS
.

我们还计划将此 SQS 作为上述相同 lambda 的事件源(场景 2)。现在的问题是,在

KDS
场景 1)的单个事件源的情况下,我使用了
RequestHandler
并且对于批处理失败,我在
sequence number of failed record
中返回了
StreamEventResponse
。在Scenario 2中,我使用
RequestStreamHandler
因为我有多个事件源(KDS和SQS)并且我在
sequence number of failed record
中编写如下
OutputStream
。然而,看起来
Lambda
并没有将其视为失败并且没有执行任何重试。有没有办法使用 RequestStreamHandler 实现
scenario 2
我可以返回
seq number for lambda to retry

场景 2,其中 KDS 和 SQS 是同一 lambda 的事件源

 public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException
{

      final byte[] byteArray= IOUtils.toByteArray(input);
      String request= new String(byteArray, StandardCharSets.UTF_8);
      KinesisEvent kinesisEvent=new ObjectMapper.readValue(request,KinesisEvent.class);
      String seqNo= kinesisEvent.getRecords().get(0).getKinesis().getSequenceNumber();
      List<StreamEventResponse.BatchItemFailure> batchItemFailures = new ArrayList();
      batchItemFailures.add(new StreamEventResponse.BatchItemFailure(seqNo));
      byte[] failures = SerializationUtils.serialize(new StreamEventResponse(batchItemFailures));

      output.write(failures); 


}
java amazon-web-services amazon-sqs amazon-kinesis
© www.soinside.com 2019 - 2024. All rights reserved.