我有简单的Paho MQTT Android客户端应用程序:
public class PahoExampleActivity extends AppCompatActivity {
MqttAndroidClient mqttAndroidClient;
final String serverUri = "ssl://myserver:8887";
String clientId = "ExampleAndroidClient";
final String subscriptionTopic = "aaa/";
final String publishTopic = "exampleAndroidPublishTopic";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Timber.plant(new Timber.DebugTree());
Timber.plant(new FileLoggingTree(this));
Timber.tag(Utils.TIMBER_TAG).v("starting");
setContentView(R.layout.activity_main);
mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
Timber.tag(Utils.TIMBER_TAG).v("Reconnected to : " + serverURI);
// Because Clean Session is true, we need to re-subscribe
subscribeToTopic();
} else {
Timber.tag(Utils.TIMBER_TAG).v("Connected to: " + serverURI);
}
}
@Override
public void connectionLost(Throwable cause) {
Timber.tag(Utils.TIMBER_TAG).v("The Connection was lost.");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Timber.tag(Utils.TIMBER_TAG).v("Incoming message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setKeepAliveInterval(300);
mqttConnectOptions.setUserName("a");
mqttConnectOptions.setPassword("a".toCharArray());
try {
mqttConnectOptions.setSocketFactory(SocketFactoryMQ.getSocketFactory(this,""));
} catch (KeyStoreException e) {
Timber.e ( e);
} catch (NoSuchAlgorithmException e) {
Timber.tag(Utils.TIMBER_TAG).e ( e);
} catch (IOException e) {
Timber.tag(Utils.TIMBER_TAG).e ( e);
} catch (KeyManagementException e) {
Timber.tag(Utils.TIMBER_TAG).e ( e);
} catch (CertificateException e) {
Timber.tag(Utils.TIMBER_TAG).e ( e);
} catch (UnrecoverableKeyException e) {
Timber.tag(Utils.TIMBER_TAG).e ( e);
}
try {
//addToHistory("Connecting to " + serverUri);
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
subscribeToTopic();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Timber.tag(Utils.TIMBER_TAG).v("Failed to connect to: " + serverUri);
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
public void subscribeToTopic(){
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Timber.tag(Utils.TIMBER_TAG).v("Subscribed!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Timber.tag(Utils.TIMBER_TAG).v("Failed to subscribe");
}
});
mqttAndroidClient.subscribe(subscriptionTopic, 0, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// message Arrived!
Timber.tag(Utils.TIMBER_TAG).v("Message: " + topic + " : " + new String(message.getPayload()));
sendNotification(topic,new String(message.getPayload()));
}
});
} catch (MqttException ex){
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
public void sendNotification(String title, String message) {
...
}
}
当电话连接到充电器或处于活动状态时,一切正常。我的申请小姐手机与充电器断开连接并关闭时的MQTT消息(我称其为电源安全模式)。在这种情况下,一段时间后电话开始丢失MQTT消息。根据从电话中获取的日志,我们可以看到电话唤醒时出现消息:
Thu Feb 20 2020 at 02:41:27:776 pm Message: aaa/ : 2
Thu Feb 20 2020 at 02:41:49:537 pm Message: aaa/ : 3
Thu Feb 20 2020 at 02:44:26:972 pm Message: aaa/ : 2
Thu Feb 20 2020 at 02:44:47:913 pm Message: aaa/ : 3
Thu Feb 20 2020 at 02:45:20:876 pm Message: aaa/ : 4
Thu Feb 20 2020 at 02:46:01:322 pm Message: aaa/ : 5
Thu Feb 20 2020 at 02:46:52:873 pm Message: aaa/ : 6
Thu Feb 20 2020 at 02:47:09:993 pm The Connection was lost.
Thu Feb 20 2020 at 02:54:44:263 pm Reconnected to : ssl://myserver:8887
Thu Feb 20 2020 at 02:54:44:357 pm Subscribed!
Thu Feb 20 2020 at 02:54:48:196 pm MainActivity.onStart
Thu Feb 20 2020 at 02:55:28:465 pm MainActivity.onStop
Thu Feb 20 2020 at 02:55:33:080 pm Message: aaa/ : 12
Thu Feb 20 2020 at 02:57:35:070 pm Message: aaa/ : 13
Thu Feb 20 2020 at 02:58:30:264 pm The Connection was lost.
Thu Feb 20 2020 at 03:02:54:001 pm Reconnected to : ssl://myserver:8887
Thu Feb 20 2020 at 03:02:54:103 pm Subscribed!
消息7-11刚刚没有到达我的设备。如何解决这个问题?
UPD
将QOS 0
更改为QOS 1
后,我收到了所有消息。从10开始的邮件经过一段时间的延迟后同时发送。乍看之下5分钟。延迟无关紧要,但是我不确定延迟可能有多大。例如30分钟。延迟不适当。
Thu Feb 20 2020 at 06:20:54:411 pm Message: aaa/ : 2
Thu Feb 20 2020 at 06:21:16:221 pm Message: aaa/ : 3
Thu Feb 20 2020 at 06:21:48:173 pm Message: aaa/ : 4
Thu Feb 20 2020 at 06:22:29:642 pm Message: aaa/ : 5
Thu Feb 20 2020 at 06:23:21:571 pm Message: aaa/ : 6
Thu Feb 20 2020 at 06:24:23:327 pm Message: aaa/ : 7
Thu Feb 20 2020 at 06:25:34:278 pm Message: aaa/ : 8
Thu Feb 20 2020 at 06:26:56:309 pm Message: aaa/ : 9
Thu Feb 20 2020 at 06:27:16:408 pm The Connection was lost.
Thu Feb 20 2020 at 06:32:27:667 pm MainActivity.onStart
Thu Feb 20 2020 at 06:32:37:320 pm Reconnected to : ssl://myserver:8887
Thu Feb 20 2020 at 06:32:37:390 pm Message: aaa/ : 10
Thu Feb 20 2020 at 06:32:37:442 pm Subscribed!
Thu Feb 20 2020 at 06:32:37:450 pm Message: aaa/ : 11
Thu Feb 20 2020 at 06:32:37:498 pm Message: aaa/ : 12
Thu Feb 20 2020 at 06:32:38:084 pm MainActivity.onStop
Thu Feb 20 2020 at 06:34:02:431 pm Message: aaa/ : 13
您已经在QOS 0处订阅了“即发即弃”主题,将不做任何努力来确保消息确实已传递。
mqttAndroidClient.subscribe(subscriptionTopic, 0, new IMqttMessageListener() {...
如果您希望有保证的交付,则需要使用QOS 1(至少一次)或QOS 2(一次且仅一次)
mqttAndroidClient.subscribe(subscriptionTopic, 1, new IMqttMessageListener() {...