素材巴巴 > 程序开发 >

EMQ服务器学习2 mqtt和springboot的整合实现消息推送和发送

程序开发 2023-09-06 20:35:53

注:本次demo参考博客https://blog.csdn.net/zhangxing52077/article/details/80568244

并在原文的基础上进行了改造,以及加上一些自己对代码设计方面的一些理解。

代码放在了github上 地址:https://github.com/wws11/springboot-mqttdemo

EMQ服务器为我们提供了一个控制面板界面,在本地访问:http://192.168.3.93:18083/ 

默认账户:admn 密码public 当然这些可以配置,需要的自行百度。

登录进来是这样的界面:

默认是英文的界面,可以在下面的seting里进行设置。

后面用到的是这里的websocket栏,通过这个进行接口的测试。

下面是整合springboot的代码

所需要的依赖

org.springframework.bootspring-boot-starter-integrationorg.springframework.integrationspring-integration-streamorg.springframework.integrationspring-integration-mqttorg.projectlomboklombok1.16.18

消息publicsh端代码实现

mqtt发送消息的核心类,创建连接使用了单例的方式,

package com.gysoft.emqdemo.server;import com.gysoft.emqdemo.bean.PushPayload;
 import com.gysoft.emqdemo.util.PropertiesUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.stereotype.Component;/**** @author 魏文思* @date 2019/11/14$ 15:55$*/
 @Slf4j
 @Component
 public class MqttPushServer {private MqttClient client;private static volatile MqttPushServer mqttPushClient = null;//重连次数private int reConnTimes;public int getReConnTimes() {return this.reConnTimes;}public void setReConnTimes(int reConnTimes) {if (this.isConnected()) {reConnTimes = 0;}this.reConnTimes = reConnTimes;}public int getMaxReconnTimes() {return PropertiesUtil.MQTT_MAXRECONNECTTIMES;}public int getReconnInterval() {return PropertiesUtil.MQTT_RECONNINTERVAL;}public static MqttPushServer getInstance(){if(null == mqttPushClient){synchronized (MqttPushServer.class){if(null == mqttPushClient){mqttPushClient = new MqttPushServer();}}}return mqttPushClient;}private MqttPushServer() {connect();}public void connect(){try {client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();/*** clean session 值为false,既保留会话,那么该客户端上线的时候,并订阅了主题“r”,那么该主题会一直存在,即使客户端离线,该主题也仍然会记忆在EMQ服务器内存。* 当客户端离线又上线时,仍然会接受到离线期间别人发来的publish消息(QOS=0,1,2).类似及时通讯软件,终端可以接受离线消息。* 除非客户端主动取消订阅主题, 否则主题一直存在。另外,mnesia不会持久化session,subscription和topic,服务器重启则丢失。* 当clean session 为true* 该客户端上线,并订阅了主题“r”,那么该主题会随着客户端离线而删除。* 当客户端离线又上线时,接受不到离线期间别人发来的publish消息** 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。* 当clean session的值为true,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。* 当clean session的值为false,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收**/options.setCleanSession(false);/*options.setUserName(PropertiesUtil.MQTT_USER_NAME);options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());*/options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);try {client.setCallback(new PushCallback());client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布,默认qos为0,非持久化* @param topic* @param pushMessage*/public void publish(String topic,PushPayload pushMessage){publish(0, false, topic, pushMessage);}/*** 发布* @param qos* @param retained* @param topic* @param pushMessage*/public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.toString().getBytes());MqttTopic mTopic = client.getTopic(topic);if(null == mTopic){log.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();} catch (Exception e) {e.printStackTrace();}}/*** 订阅某个主题,qos默认为0* @param topic*/public void subscribe(String topic){subscribe(topic,0);}/*** 订阅某个主题* @param topic* @param qos*/public void subscribe(String topic,int qos){try {client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}public boolean isConnected() {return client.isConnected();}public static void main(String[] args) throws Exception {String kdTopic = "demo/topics";PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215").setContent("designModel").bulid();MqttPushServer.getInstance().publish(0, false, kdTopic, pushMessage);}
 }
 

配置类,用于读取mqtt的一些配置,服务器采用本的服务器不需要配置密码,这里我把账号密码都进行了注释

package com.gysoft.emqdemo.util;import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;/*** @author 魏文思* @date 2019/11/14$ 15:56$*/
 public class PropertiesUtil {public static String MQTT_HOST;public static String MQTT_CLIENTID;public static String MQTT_USER_NAME;public static String MQTT_PASSWORD;public static int MQTT_TIMEOUT;public static int MQTT_KEEP_ALIVE;public  static String prefixUrl;/*** 最大重连次数*/public  static int MQTT_MAXRECONNECTTIMES;public static  int MQTT_RECONNINTERVAL;static {MQTT_HOST = loadMqttProperties().getProperty("host");MQTT_CLIENTID = loadMqttProperties().getProperty("clientid");/*  MQTT_USER_NAME = loadMqttProperties().getProperty("username");MQTT_PASSWORD = loadMqttProperties().getProperty("password");*/MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("timeout"));MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("keepalive"));MQTT_MAXRECONNECTTIMES=Integer.valueOf(loadMqttProperties().getProperty("maxReconnectTimes"));MQTT_RECONNINTERVAL=Integer.valueOf(loadMqttProperties().getProperty("reconnInterval"));prefixUrl = String.valueOf(loadMqttProperties().getProperty("prefixUrl"));}private static Properties loadMqttProperties() {InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");Properties properties = new Properties();try {properties.load(inputstream);return properties;} catch (IOException e) {throw new RuntimeException(e);} finally {try {if (inputstream != null) {inputstream.close();}} catch (IOException e) {throw new RuntimeException(e);}}}}
 

mqttqos支持的三种类型的枚举类

package com.gysoft.emqdemo.util;/*** @author 魏文思* @date 2019/11/15$ 10:27$*/
 public enum QosType {QOS_AT_MOST_ONCE(0, "最多一次,有可能重复或丢失"),QOS_AT_LEAST_ONCE(1, "至少一次,有可能重复"),QOS_EXACTLY_ONCE(2, "只有一次,确保消息只到达一次");private int number;private String desc;QosType(int num, String desc) {this.number = num;this.desc = desc;}public int getNumber() {return number;}public String getDesc() {return desc;}}
 

推送消息的实体类

package com.gysoft.emqdemo.bean;import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;/*** mqtt 消息推送实体** @author 魏文思* @date 2019/11/14$ 15:52$*/
 @Slf4j
 @Setter
 @Getter
 public class PushPayload {//推送类型private String type;//推送对象private String mobile;//标题private String title;//内容private String content;//数量private Integer badge = 1;//铃声private String sound = "default";public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){this.type = type;this.mobile = mobile;this.title = title;this.content = content;this.badge = badge;this.sound = sound;}public static class Builder{//推送类型private String type;//推送对象private String mobile;//标题private String title;//内容private String content;//数量private Integer badge = 1;//铃声private String sound = "default";public Builder setType(String type) {this.type = type;return this;}public Builder setMobile(String mobile) {this.mobile = mobile;return this;}public Builder setTitle(String title) {this.title = title;return this;}public Builder setContent(String content) {this.content = content;return this;}public Builder setBadge(Integer badge) {this.badge = badge;return this;}public Builder setSound(String sound) {this.sound = sound;return this;}public PushPayload bulid(){return new PushPayload(type,mobile,title,content,badge,sound);}}public static Builder getPushPayloadBuider(){return new Builder();}@Overridepublic String toString() {return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);}}
 

回调函数,用于在接受消息后,重连等一些操作,

messageArrived()方法 当消息到达后做的一些处理,操作
connectionLost()方法用于处理断线重连操作,我这里的实现现不用参考,这个地方我处理的还有点问题,还没有解决门后面解决了在更新。
deliveryComplete()  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
package com.gysoft.emqdemo.server;import com.gysoft.emqdemo.util.NetUtils;
 import com.gysoft.emqdemo.util.PropertiesUtil;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.concurrent.TimeUnit;/*** @author 魏文思* @date 2019/11/14$ 15:57$*/
 public class PushCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable throwable) {/* MqttPushServer mqttPushServer = MqttPushServer.getInstance();//在断开连接时使用,主要用于重连System.out.println("开始判断是否进入重连");do {System.out.println("进入重连");if (NetUtils.connectTest(PropertiesUtil.prefixUrl)) {mqttPushServer.connect();mqttPushServer.setReConnTimes(mqttPushServer.getReConnTimes() + 1);}try {TimeUnit.SECONDS.sleep((long) mqttPushServer.getReconnInterval());} catch (InterruptedException var3) {System.out.println("重连出现异常");var3.printStackTrace();}} while (!mqttPushServer.isConnected() && mqttPushServer.getReConnTimes() < mqttPushServer.getMaxReconnTimes());System.out.println("重试成功!!!");*/}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {//服务端不用关心,客户端的业务// subscribe后得到的消息会执行到这里面/* System.out.println("接收消息主题 : " + topic);System.out.println("接收消息Qos : " + message.getQos());System.out.println("接收消息内容 : " + new String(message.getPayload()));*/}
 }
 

mqtt yml 配置文件


 #mq配置
 com:mqtt:host: tcp://localhost:1883clientid: JavaSampletopic: demo/topicstimeout: 10keepalive: 20maxReconnectTimes: 5reconnInterval: 1prefixUrl: http://192.168.3.93:18083server:port: 9090
 

测试controller,使用main方法测试,这里我将mqtt交给了spring进行维护管理

package com.gysoft.emqdemo.controller;import com.gysoft.emqdemo.bean.PushPayload;
 import com.gysoft.emqdemo.server.MqttPushServer;
 import com.gysoft.emqdemo.util.QosType;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;/*** @author 魏文思* @date 2019/11/15$ 14:36$*/
 @RestController
 public class EMQController {@GetMapping("/sendMessage")public String testMQ() {String kdTopic = "demo/topics";PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215").setContent("designModel").bulid();/*** mqtt发布消息的时候,可以设置保留消息标志,保留消息会驻留在消息服务器,后来的订阅主题仍然可以接受该消息* 关于retain的说明:* 终端设备publish消息时,如果retain值是true,则会服务器一直记忆,哪怕是服务重启。因为Mnesia会本地持久化。* publish某主题的消息,payload为空且retain值是true,则会删除这条持久化的消息。** publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。QOS_AT_MOST_ONCE(0, "最多一次,有可能重复或丢失"),QOS_AT_LEAST_ONCE(1, "至少一次,有可能重复"),QOS_EXACTLY_ONCE(2, "只有一次,确保消息只到达一次");*///这里将消息异步处理  使用futuretask,或者使用rabbimq进行异步处理或者spring的异步机制进行处理FutureTask futureTask = new FutureTask(() -> {MqttPushServer.getInstance().publish(QosType.QOS_AT_LEAST_ONCE.getNumber(), true, kdTopic, pushMessage);return true;});ExecutorService service = Executors.newCachedThreadPool();service.submit(futureTask);try {Boolean result = (Boolean) futureTask.get();if (result == true) {System.out.println("消息发送成功");} else {System.out.println("消息推送异常");}} catch (Exception e) {System.out.println("消息推送异常");e.printStackTrace();}return "ok";}}
 

接收端先使用dashboard界面提供的websocket进行测试

首先连接emq服务器

然后再订阅我们的消息指定订阅主题topic

接口访问:多刷新了几次,看看效果

消息推送结果

在最下面可以看到我们从服务端发来的消息

从官网上找了一下客户端java的实现,其实实现和服务端基本一致就是一个订阅,一个发布,订阅和发布相同的主题

package com.gysoft.emqdemo.client;import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** @author 魏文思* @date 2019/11/15$ 17:43$*/
 public class Client {public static void main(String[] args) {String topic        = "demo/topics";String content      = "Message from MqttPublishSample";int qos             = 1;String broker       = "tcp://localhost:1883";String clientId     = "client1";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient sampleClient = new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("Connecting to broker: "+broker);sampleClient.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: "+content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.subscribe("demo/topics",1);sampleClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println(mqttMessage);System.out.println();System.out.println(s);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});/*  System.out.println("Message published");sampleClient.disconnect();System.out.println("Disconnected");System.exit(0);*/} catch(MqttException me) {System.out.println("reason "+me.getReasonCode());System.out.println("msg "+me.getMessage());System.out.println("loc "+me.getLocalizedMessage());System.out.println("cause "+me.getCause());System.out.println("excep "+me);me.printStackTrace();}}
 }
 

在控制台看到服务端推送的结果:

至此mqtt的推送订阅就已经实现。后面继续加深研究mqtt的其他特性。


标签:

上一篇: 6、字节-负数表示-补码-128计算 下一篇:
素材巴巴 Copyright © 2013-2021 http://www.sucaibaba.com/. Some Rights Reserved. 备案号:备案中。