@SqsListener(value = "${sqs.queue.event.in}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL)
public void receiveHourEvent(Acknowledgement acknowledgement, String event) {
///do something
}
要正确处理JSON序列化和避免序列化,我在春季配置中配置了一个自定义对象apper:
public class MyConfig {
@Bean
@Primary
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = JsonMappers.getObjectMapper();
// Register the JavaTimeModule to support Java 8 date/time types
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return objectMapper;
}
@Bean
public SqsMessagingMessageConverter sqsMessagingMessageConverter(SqsExtendedClient sqsExtendedClient, ObjectMapper objectMapper) {
ExtendedSqsMessagingMessageConverter converter = new ExtendedSqsMessagingMessageConverter(sqsExtendedClient);
converter.setObjectMapper(objectMapper);
return converter;
}
}
@Bean
SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, SqsMessagingMessageConverter sqsMessagingMessageConverter, MappingJackson2MessageConverter mappingJackson2MessageConverter) {
SqsTemplate sqsTemplate = SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.messageConverter(sqsMessagingMessageConverter)
.build();
return sqsTemplate;
}
我确保没有依赖冲突,并且包括Java 8日期/时间支持所需的杰克逊模块:
<!--JSON serialization-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.2</version>
</dependency>
当将消息发送到队列时:
sqsTemplate.send(queueName, event);
该消息是作为自定义对象发送的,但是由于我的侦听器接受字符串,因此无法正确地对即时DateTime字段进行挑选。full错误日志:
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Java 8 date/time type `java.time.Instant` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling
at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 122] (through reference chain: no.embriq.quant.flow.typelib.common.QFEvent["dateCreated"])
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:256)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:183)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:174)
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:57)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.convertPayload(AbstractMessagingMessageConverter.java:182)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.toMessagingMessage(AbstractMessagingMessageConverter.java:163)
at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessage(AbstractMessageConvertingMessageSource.java:99)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessages(AbstractMessageConvertingMessageSource.java:90)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:58)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:110)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:253)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type `java.time.Instant` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling
at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 122] (through reference chain: no.embriq.quant.flow.typelib.common.QFEvent["dateCreated"])
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1887)
at com.fasterxml.jackson.databind.deser.impl.UnsupportedTypeDeserializer.deserialize(UnsupportedTypeDeserializer.java:48)
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:399)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4905)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3848)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:251)
... 45 common frames omitted
如何确保正确注入并用于侦听器中的避难所??
在ObjectMapper中可启用。
将您的objectMapper bean添加到此:
INCLUDE_SOURCE_IN_LOCATION