通过SQS中的那些标头效果很好,但是在SNS中它不起作用,并且会出现错误:
Caused by: com.amazonaws.services.sns.model.InvalidParameterException: Invalid parameter: The MessageGroupId parameter is required for FIFO topics (Service: AmazonSNS; Status Code: 400; Error Code: InvalidParameter; Request ID: 1aa83814-abc8-56e9-ae15-619723438fe9; Proxy: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) ~[aws-java-sdk-core-1.11.933.jar:na]
我必须更改标题,否则还有另一种方式around?
i我通过不使用春季的SDK找到了一种方法,因为SNS的FIFO是新的,春季还没有针对此问题实现任何解决方案,我找不到通过春季通过春季将参数传递到该主题的方法,以下是链接这有助于我解决:
https://docs.aws.amazon.com/sns/latest/dg/fifo-topic-code-examples.html这是我的方法的完成方式:
private final String topicArn;
private final AmazonSNS amazonSNS;
private final ObjectMapper objectMapper;
public void send(final T payload, final Object groupId) {
try {
amazonSNS.publish(new PublishRequest()
.withTopicArn(topicArn)
.withMessageDeduplicationId(UUID.randomUUID().toString())
.withMessage(objectMapper.writeValueAsString(payload))
.withMessageGroupId(groupId.toString()));
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
如果您不想更新您的spring-cloud-aws,如下所述:https://github.com/spring-attic/spring-cloud-aws/issues/714,您可以执行以下操作:
创建处理程序以在发送消息之前拦截该消息
import com.amazonaws.Request;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sns.model.PublishRequest;
public class SnsFifoHandler extends RequestHandler2 {
@Override
public void beforeRequest(Request<?> request) {
super.beforeRequest(request);
if (request.getOriginalRequest() instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) request.getOriginalRequest();
if (publishRequest.getTopicArn().contains("fifo")) {
String messageGroupId = publishRequest.getMessageAttributes()
.get("MessageGroupId")
.getStringValue();
String deduplicationId = publishRequest.getMessageAttributes()
.get("MessageDeduplicationId")
.getStringValue();
request.addParameter("MessageGroupId", messageGroupId);
request.addParameter("MessageDeduplicationId", deduplicationId);
}
}
}
}
@Bean
public SnsFifoHandler snsFifoHandler() {
return new SnsFifoHandler();
}
@Bean
public AmazonSNSAsync amazonSNSAsync(SnsFifoHandler snsFifoHandler) {
return AmazonSNSAsyncClientBuilder.standard().withRegion(region).withRequestHandlers(snsFifoHandler).build();
}
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessagingService {
@Autowired
private NotificationMessagingTemplate notificationMessagingTemplate;
public void sendToFifoTopic(Object message, String topicName, String messageGroupId, String messageDeduplicationId) {
notificationMessagingTemplate.convertAndSend(
topicName,
message,
Map.of("MessageGroupId", messageGroupId, "MessageDeduplicationId", messageDeduplicationId));
}
}
SnsNotification<Event> notification = SnsNotification.builder(event)
.groupId("theGroupId")
.deduplicationId("theDeduplicationId")
.build();
snsTemplate.sendNotification(snsTopicArn, notification);