素材巴巴 > 程序开发 >

阿里云物联网平台云端采用AMQP方式接入同时订阅与发布消息

程序开发 2023-09-14 06:17:07

工业互联网需要采集设备底层数据进行实时数据展示与状态预警,由于底层硬件设备无法采用http等重量级请求发送数据,工业上常常采用MQTT协议进行数据传输,本次基于阿里云物联网平台进行数据采集,本次主要云端收集信息与数据下发,云端采用AMQP方式接入,该方式使得云端服务同时具体发布与订阅功能;设备端采用阿里云IOT平台MQTT.fx模拟接入
环境:JDK1.8+maven+springboot
GitHub源码地址:https://github.com/hou296498161/amqp

1.添加以下依赖

        org.springframework.bootspring-boot-starter-webcom.aliyunaliyun-java-sdk-core3.7.1com.aliyunaliyun-java-sdk-iot6.11.0org.msgpackmsgpack0.6.12commons-codeccommons-codec1.10org.apache.qpidqpid-jms-client0.47.0
 

1)、阿里云创建产品,定义自定义topic,类型为发布订阅
2)、创建产品、
3)、创建服务端订阅
在这里插入图片描述

2、服务端订阅消息

package com.ali.amqp.subscribe;import org.apache.commons.codec.binary.Base64;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionListener;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 import javax.jms.*;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import java.net.URI;
 import java.util.Hashtable;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;/*** 云端订阅消息*/
 public class AmqpJavaClient {private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClient.class);//业务处理异步线程池,线程池参数可以根据您的业务特点调整;或者您也可以用其他异步方式处理接收到的消息private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50000));public static void subscribe() throws Exception {//参数说明参见阿里云文档:AMQP客户端接入说明。String accessKey = "你的阿里云accessKey";String accessSecret = "你的阿里云 accessSecret";//消费组IDString consumerGroupId = "DEFAULT_GROUP";long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5,hmacsha1和hmacsha256String signMethod = "hmacsha1";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。String clientId = UUID.randomUUID().toString().replaceAll("-","");//UserName组装方法,请参见阿里云文档:AMQP客户端接入说明。String userName = clientId + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",consumerGroupId=" + consumerGroupId+ "|";//password组装方法,请参见上一篇文档:AMQP客户端接入说明。String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;String password = doSign(signContent,accessSecret, signMethod);//按照qpid-jms的规范,组装连接URL。String connectionUrl = "failover:(amqps://1090243284576461.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// Create ConnectionConnection connection = cf.createConnection(userName, password);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// Create Session// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// Create Receiver LinkMessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message.getBody(byte[].class)==null){logger.error("收到一条空的消息,topic->{}",message.getStringProperty("topic"));}else {logger.info("收到消息:topic->{}:{}",message.getStringProperty("topic"),new String(message.getBody(byte[].class)));}//1.收到消息之后一定要ACK// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() ->System.out.println(""));} catch (Exception e) {logger.error("submit task occurs exception ", e);}}};private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** password签名计算方法,请参见阿里云文档:AMQP客户端接入说明。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}
 }

3服务端发布

package com.ali.amqp.publish;import com.ali.amqp.subscribe.AmqpJavaClient;
 import com.aliyuncs.DefaultAcsClient;
 import com.aliyuncs.IAcsClient;
 import com.aliyuncs.iot.model.v20180120.PubRequest;
 import com.aliyuncs.iot.model.v20180120.PubResponse;
 import com.aliyuncs.profile.DefaultProfile;
 import org.apache.commons.codec.binary.Base64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;import java.io.*;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;/*** 云端下发消息*/
 public class PopPubServer {private final static Logger logger = LoggerFactory.getLogger(PopPubServer.class);public static boolean sendToTopic(String topic) throws UnsupportedEncodingException {String regionId = "cn-shanghai";String accessKey = "你的accessKey";String accessSecret = "你的accessSecret";final String productKey = "设备productKey";//设置client的参数DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret);IAcsClient autoClient = new DefaultAcsClient(profile);PubRequest request = new PubRequest();request.setQos(0);//设置发布消息的topicrequest.setTopicFullName("/a14UPaWxJCF/test_device/user/test");request.setProductKey(productKey);//设置消息的内容,一定要用base64编码,否则乱码Base64 base64 = new Base64();request.setMessageContent(base64.encodeToString("hello".getBytes("UTF-8")));try {PubResponse response = autoClient.getAcsResponse(request);Boolean success = response.getSuccess();return success;} catch (Exception e) {logger.warn("阿里云消息发送异常,topic:{},异常信息:{}",topic,e.getMessage());return false;}}
 }
 

标签:

素材巴巴 Copyright © 2013-2021 http://www.sucaibaba.com/. Some Rights Reserved. 备案号:备案中。