我如何通过fifosns

问题描述 投票:0回答:3

通过SQS中的那些标头效果很好,但是在SNS中它不起作用,并且会出现错误:

Caused by: com.amazonaws.services.sns.model.InvalidParameterException: Invalid parameter: The MessageGroupId parameter is required for FIFO topics (Service: AmazonSNS; Status Code: 400; Error Code: InvalidParameter; Request ID: 1aa83814-abc8-56e9-ae15-619723438fe9; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) ~[aws-java-sdk-core-1.11.933.jar:na]

我必须更改标题,否则还有另一种方式around?

	

i我通过不使用春季的SDK找到了一种方法,因为SNS的FIFO是新的,春季还没有针对此问题实现任何解决方案,我找不到通过春季通过春季将参数传递到该主题的方法,以下是链接这有助于我解决:

https://docs.aws.amazon.com/sns/latest/dg/fifo-topic-code-examples.html
amazon-sns spring-messaging
3个回答
2
投票

这是我的方法的完成方式: private final String topicArn; private final AmazonSNS amazonSNS; private final ObjectMapper objectMapper; public void send(final T payload, final Object groupId) { try { amazonSNS.publish(new PublishRequest() .withTopicArn(topicArn) .withMessageDeduplicationId(UUID.randomUUID().toString()) .withMessage(objectMapper.writeValueAsString(payload)) .withMessageGroupId(groupId.toString())); } catch (final IOException e) { throw new RuntimeException(e); } }

如果您不想更新您的spring-cloud-aws,如下所述:
https://github.com/spring-attic/spring-cloud-aws/issues/714,您可以执行以下操作:

2
投票

创建处理程序以在发送消息之前拦截该消息 import com.amazonaws.Request; import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.services.sns.model.PublishRequest; public class SnsFifoHandler extends RequestHandler2 { @Override public void beforeRequest(Request<?> request) { super.beforeRequest(request); if (request.getOriginalRequest() instanceof PublishRequest) { PublishRequest publishRequest = (PublishRequest) request.getOriginalRequest(); if (publishRequest.getTopicArn().contains("fifo")) { String messageGroupId = publishRequest.getMessageAttributes() .get("MessageGroupId") .getStringValue(); String deduplicationId = publishRequest.getMessageAttributes() .get("MessageDeduplicationId") .getStringValue(); request.addParameter("MessageGroupId", messageGroupId); request.addParameter("MessageDeduplicationId", deduplicationId); } } } }

  1. 将其放在弹簧豆
@Bean
public SnsFifoHandler snsFifoHandler() {
    return new SnsFifoHandler();
}
  1. 用它来构建Amazonsnsasync bean实例
@Bean
public AmazonSNSAsync amazonSNSAsync(SnsFifoHandler snsFifoHandler) {
  return AmazonSNSAsyncClientBuilder.standard().withRegion(region).withRequestHandlers(snsFifoHandler).build();
}
  1. 最终发送消息
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessagingService {

    @Autowired
    private NotificationMessagingTemplate notificationMessagingTemplate;

    public void sendToFifoTopic(Object message, String topicName, String messageGroupId, String messageDeduplicationId) {
        notificationMessagingTemplate.convertAndSend(
                topicName,
                message,
                Map.of("MessageGroupId", messageGroupId, "MessageDeduplicationId", messageDeduplicationId));
    }
}
  1. 我解决了以下内容:
      SnsNotification<Event> notification = SnsNotification.builder(event)
                .groupId("theGroupId")
                .deduplicationId("theDeduplicationId")
                .build();

        snsTemplate.sendNotification(snsTopicArn, notification);

0
投票

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.