我是java以及整个spring框架的新手。我正在尝试在 SpringBoot 中开发一个应用程序,以便在 MQTT 上下文中使用。
该应用程序由两个微服务组成:第一个微服务从代理检索数据并将其保存到数据库,第二个微服务公开端点以允许用户检索某种数据并对其执行操作。
问题出现在第一个微服务中,称为DataRetriever
,它必须处理从
Mosquitto代理上的主题(天气数据)读取天气消息(以
json
格式编写)并根据上下文进行处理解析它们,然后将 json 字段保存到 Postgres 中的正确表中。
基本上我找不到将消息有效负载从主题提供给对象映射器的正确方法。
我构建的整个逻辑基于两个文件:
MqttBeans
我在其中处理进出代理通道的连接逻辑,代码是这样的:
package com.example.demo.configs;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* Setting up MQTT Client connection configurations
* the Client Factory
* the Channels: Inbound + Outbound
* the Message Handler
*/
@Configuration
public class MqttBeans {
// Client Factory (factory configs)
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// Options Settings
options.setServerURIs(new String[] {"tcp://localhost:1883"});
//options.setUserName("postgres");
//String password = "12345678";
//options.setPassword(password.toCharArray());
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
// Inbound Ch (Subscribing)
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(),"#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// Topic
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // retrieving the topic from the message header
if (topic.equals("weather-data")) {
System.out.println("Here's the topic: " + topic); // printing out the topic
}
// Payload
String payload = message.getPayload().toString();
System.out.println("Here's the payload: " + payload); // printing out any msg that comes in the ch
}
};
}
// Outbound Ch (Publishing)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("serverOut", mqttClientFactory());
messageHandler.setAsync(true); // so that the client will always be up and listening
messageHandler.setDefaultTopic("weather-data");
messageHandler.setDefaultRetained(false);
return messageHandler;
}
}
StartupUtility
它应该负责解析在代理主题上写入的消息,将它们(使用 objectMapper)映射到我的 CityEntity 中写入的内容,并在 postgres 中创建和填充表(使用它在期间读取的数据) json 字段的解析)
是这样的,从StartupUtility
中编写的代码可以看出
package com.example.demo.startup;
import com.example.demo.entities.CityEntity;
import com.example.demo.repos.CityWeatherRepo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
// Implementing CommandLineRunnerInterface to be init when proj is created
@Component
@Log
public class StartupUtility implements CommandLineRunner {
// Passing the json as a value to test the parsing logic
@Value("${demo.json.string}") private String json;
// Wiring the Inbound ch
@Autowired
private MessageChannel mqttInputChannel;
// Wiring the Repo
@Autowired private CityWeatherRepo repo;
@Override
public void run(String... args) throws Exception {
// Init Obj Mapper instance
ObjectMapper mapper = new ObjectMapper();
// Avoiding failure in case of unrecognized fields during json mapping
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Getting the json + Converting into the City Class
CityEntity value = mapper.readValue(json, CityEntity.class); // replace json with payload // input: json in string format, output: OpenAPI/CityEntity (class)
// Saving
CityEntity save = repo.save(value);
// Checking the Saving process
log.info(" Entity info " + save.toString());
}
}
我要解析的
json
被硬编码到一个专用变量
“demo.json.string”中,写入
application.properties
文件中(我这样做是为了看看至少解析逻辑是否正确,并且是的),而我希望 json 是在主题(天气数据)中编写的,如下所示:{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":802,"main":"Clouds","description":"scattered clouds","icon":"03d"}],"base":"stations","main":{"temp":291.47,"feels_like":290.74,"temp_min":289.18,"temp_max":292.64,"pressure":1009,"humidity":53},"visibility":10000,"wind":{"speed":5.14,"deg":140},"clouds":{"all":40},"dt":1714483329,"sys":{"type":2,"id":2075535,"country":"GB","sunrise":1714451607,"sunset":1714504902},"timezone":3600,"id":2643743,"name":"London","cod":200}
{"coord":{"lon":2.3488,"lat":48.8534},"weather":[{"id":804,"main":"Clouds","description":"overcast clouds","icon":"04d"}],"base":"stations","main":{"temp":291.56,"feels_like":291.21,"temp_min":290.03,"temp_max":292.64,"pressure":1012,"humidity":67},"visibility":10000,"wind":{"speed":6.69,"deg":140},"clouds":{"all":100},"dt":1714483519,"sys":{"type":2,"id":2012208,"country":"FR","sunrise":1714451474,"sunset":1714503847},"timezone":7200,"id":2988507,"name":"Paris","cod":200}
{"coord":{"lon":-85.1647,"lat":34.257},"weather":[{"id":502,"main":"Rain","description":"heavy intensity rain","icon":"10d"}],"base":"stations","main":{"temp":289.27,"feels_like":289.29,"temp_min":288.15,"temp_max":290.93,"pressure":1018,"humidity":90},"visibility":10000,"wind":{"speed":1.54,"deg":20},"rain":{"1h":5.05},"clouds":{"all":100},"dt":1714483081,"sys":{"type":2,"id":2038061,"country":"US","sunrise":1714474288,"sunset":1714523036},"timezone":-14400,"id":4219762,"name":"Rome","cod":200}
{"coord":{"lon":13.4105,"lat":52.5244},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":301.22,"feels_like":300.5,"temp_min":299.81,"temp_max":302.53,"pressure":1004,"humidity":34},"visibility":10000,"wind":{"speed":5.66,"deg":150},"clouds":{"all":0},"dt":1714483328,"sys":{"type":2,"id":2011538,"country":"DE","sunrise":1714448171,"sunset":1714501842},"timezone":7200,"id":2950159,"name":"Berlin","cod":200}
{"coord":{"lon":-74.006,"lat":40.7143},"weather":[{"id":701,"main":"Mist","description":"mist","icon":"50d"}],"base":"stations","main":{"temp":286.43,"feels_like":286.14,"temp_min":284.84,"temp_max":287.62,"pressure":1015,"humidity":89},"visibility":9656,"wind":{"speed":7.2,"deg":50},"clouds":{"all":100},"dt":1714483436,"sys":{"type":2,"id":2008101,"country":"US","sunrise":1714470912,"sunset":1714521057},"timezone":-14400,"id":5128581,"name":"New York","cod":200}
我的想法是注入
StartupUtility
通道并能够传递有效负载(?)作为参数,但它不起作用。 有谁知道如何解决这个问题,或者有任何新手建议来以不同于逻辑观点的方式构建代码?
CommandLineRunner
将我的 json 映射/解析函数放入实用程序中的想法,转而使用更简单、更多的方法直观的类名为
MessageParser
,其代码如下:package com.example.demo.utils;
import com.example.demo.entities.CityEntity;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MessageParser {
/**
* NOTE: It is good practice to include in the method name the type of obj that the method will return/handle.
* In this case, the 'parse' method accepts a message payload and returns a CityEntity.
* Thereby, the 'CityEntity' obj will then be manipulated by the message handler of the 'MqttBeans' class
*/
// Parsing fn
public static CityEntity parse(String msg_payload) throws Exception {
// Init Obj Mapper instance
ObjectMapper mapper = new ObjectMapper();
// Avoiding failure in case of unrecognized fields during json mapping, a workaround may be the 'Mixin' feature from the Jackson pckg
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Retrieving the payload + Converting into the CityEntity class
return mapper.readValue(msg_payload, CityEntity.class);
}
}
同时,
MqttBeans
类的逻辑也发生了变化,更具体地说是它的
Bean
MessageHandler
,它现在调用'parse'(静态)方法,从而使用
MessageParser
;这是它的变化:[...]
// Wiring City repo
@Autowired
private CityWeatherRepo repo;
// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
// Handling msg, once it's received
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// Topic
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // retrieving the topic from the message header
if (topic.equals("weather-data")) {
System.out.println("The topic related to this message is: \n" + topic); // printing out the topic
}
// Payload
String payload = message.getPayload().toString();
System.out.println("The message payload is: \n" + payload); // printing out any msg that comes in the ch
try {
// Parsing + Saving
CityEntity value = MessageParser.parse(payload);
CityEntity save = repo.save(value);
// Checking the Saving process
log.info("\nEntity infos for the parsed payload: \n" + save.toString());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
[...]
通过这样做,我可以将逻辑分为两个不同的类。
如果有人对此有任何建议,关于如何更高效、更干净地做到这一点,请随时写下并提交。