素材巴巴 > 程序开发 >

SpringCloud(6)— RabbitMQ(消息队列)

程序开发 2023-09-04 09:41:52

SpringCloud(6)— RabbitMQ(消息队列)

一 初识MQ

1.同步通信与异步通信

1.同步通信的问题

同步调用的优点在于时效性高,可以立即得到结果

微服务之间基于Feign的调用属于同步方式,存在一些问题

在这里插入图片描述

在这里插入图片描述

2.异步调用方案

异步调用常见的实现就是 事件驱动模式

在这里插入图片描述

异步通信的缺点:

2.MQ介绍

MQ(MessageQueue),中文含义为消息队列,用来存放消息,也就是事件驱动模式中的 Broker

常见的MQ技术包含一下四种:

在这里插入图片描述

二 RabbitMQ的使用

1.安装与运行

RabbitMQ基于Erlang语言开发的开源消息中间件。

RabbitMQ官方地址:Messaging that just works — RabbitMQ

在DockerHub上拉取RabbitMQ的镜像,然后运行。

#1.拉取RabbitMQ
 docker pull rabbitmq:3
 #2.运行RabbitMQ镜像,(15672 为管理UI界面的端口,5672为后期通信接口)
 docker run -e RABBITMQ_DEFAULT_USER=shawn -e RABBITMQ_DEFAULT_PASS=123456 --name rabbitmq3 --hostname myrabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3
 

安装完成之后不能直接进行访问,需要先进入容器内部开启插件才可以访问

#1.进入 rabbitmq 容器内
 docker exec -it myrabbitmq bash
 #2.开启插件
 rabbitmq-plugins enable rabbitmq_management
 #3.如果直接拉取 management 版本的镜像,则无需以上步骤
 docker pull rabbitmq:3-management
 #4.运行 management 版本的rabbit
 docker run -e RABBITMQ_DEFAULT_USER=shawn -e RABBITMQ_DEFAULT_PASS=123456 --name rabbitmq3 --hostname myrabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
 

完成以后即可在浏览器中输入服务器地址+端口号访问了。

在这里插入图片描述

踩坑点:Rabbit的Web UI 中Channel模块无法打开并且提示 Stats in management UI are disabled on this node

是因为默认情况下Rabbit是禁止的。

The reason is that the default image disables metrics collector in the management_agent plugin

原因是默认图像禁用management_agent插件中的度量收集器

# 1.进入容器内部
 docker exec -it myrabbitmq bash
 # 2.切换至配置文件目录下
 cd  /etc/rabbitmq/conf.d/
 # 3.将 management_agent.disable_metrics_collector.conf 文件中的 management_agent.disable_metrics_collector 的值修改为 false
 echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
 # 4.重启 rabbit 容器
 docker restart rabbitmq3
 

2.RabbitMQ概述

在这里插入图片描述

RabbitMQ通过 VirtualHost 进行隔离,相互不可见。

RabbitMQ中的相关概念

三 RabbitMQ消息模型

在这里插入图片描述

1.简单队列(Hello World)

在这里插入图片描述

1.创建项目引入依赖

创建两个项目分别为发送者和订阅者,引入RabbitMQ的maven坐标

org.springframework.bootspring-boot-starter-amqp
 
 

或者在初始化项目时直接勾选RabbitMQ

在这里插入图片描述

2.编写测试代码示例

消息发送者示例代码:publisher

@Test
 void publisher() throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.119.101");factory.setUsername("shawn");factory.setPassword("123456");factory.setPort(5672);factory.setVirtualHost("/");//建立连接Connection connection=factory.newConnection();//创建通道Channel channel=connection.createChannel();//创建队列String queueName="simple.queue";String message="贾君鹏,你妈喊你回家吃饭!";channel.basicPublish("",queueName,null,message.getBytes());System.out.println("发送消息:"+message);channel.close();connection.close();
 }
 

消息接受者示例代码:consumer

@Test
 void consumer() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.101");factory.setUsername("shawn");factory.setPassword("123456");factory.setPort(5672);factory.setVirtualHost("/");//创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);//订阅消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 处理消息String message=new String(body,"UTF-8");System.out.println("接受消息:"+message);}});System.out.println("等待接受消息,退出请按 CTRL+C");
 }
 

3.基本消息队列的发送流程

2.SpringAMQP

官方文档:https://spring.io/projects/spring-amqp

在这里插入图片描述

AMQP是消息接受与发送的协议或者标准,与语言和平台无关

1.RabbitTemplate

RabbitTemplate是一个Spring封装的用来发送消息的工具类,类似 RedisCache,RestTemplate,可以简单,高效,优雅的实现消息的发送和接收

引入 spring-boot-starter-amqp 依赖,然后在相关项目(publisher 和 consumer )的配置文件中配置相关参数。

spring:application:name: publisherrabbitmq:host: 192.168.119.101# 默认端口为5672,使用默认端口时可不写port: 5672username: shawnpassword: 123456virtual-host: /
 

在 publisher 项目中编写测试代码,发送消息

@Autowired
 private RabbitTemplate rabbitTemplate;
 @Test
 void publisher() {String queueName="hello.world";String message="Hello,RabbitTemplate";# 这里使用 convertAndSend 进行消息的处理和发送rabbitTemplate.convertAndSend(queueName,message);System.out.println("消息发送完成");
 }
 

在 consumer 项目中编写代码,订阅并且接受消息

/**
 * 使用 Component 将当前类申明为一个 bean
 * 定义一个类,使用 RabbitListener 注解订阅要接受消息的队列
 */
 @Component
 public class SimpleMessageRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleMessage(String message) {System.out.println("接受消息:" + message);}
 }
 

注意:消息一旦消费,就会从消息队列中删除,RabbitMQ没有消息回溯

3.WorkQueue(工作队列)

工作队列模型,多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。

在这里插入图片描述

工作队列,可以提高消息处理速度,避免消息堆积

@RabbitListener(queues = "simple.queue")
 public void listenWorkMessage1(String message) throws InterruptedException {System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(20);
 }@RabbitListener(queues = "simple.queue")
 public void listenWorkMessage2(String message) throws InterruptedException {System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(200);
 }
 

定义两个执行效率不同的消费者,模拟一个简单的工作队列模型,发现消费者并不会因为执行效率的高低自动的增加或处理消息的处理量,而是平均分配。

可以通过消息的欲取机制来控制消息的处理量

欲取机制,即默认情况下,有多少消息就拿多少消息,并不会考虑有多少个消费者。

通过调整 prefetch 属性的值,来控制消费者的消息预取能力

spring:application:name: consumerrabbitmq:host: 192.168.119.101port: 5672username: shawnpassword: 123456virtual-host: /listener:simple:# 通过调整 prefetch 属性的值,来控制消费者的消息预取能力prefetch: 1
 

4.发布订阅模式

发布订阅模式(Publish-Subscribe)的核心是,允许将一个消息发给多个消费者。具体的实现方式是加入了exchange(交换机)。

在这里插入图片描述

注意:exchange只负责消息的转发,而不是存储。路由失败则消息丢失

1.Exchange(交换机)

交换机的作用:

需要使用到的 Bean:FanoutExchange Queue Binding

2.FanoutExchange

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue

在 Consumer 项目中定义队列(queue),交换机(exchange),并且将队列绑定到交换机上。

/**
 示例: 声明一个交换机和两个队列,并且将队列与交换机进行绑定
 */
 @Configuration
 public class FanoutExchangeConfig {/***声明一个FanoutExchange对象,并且添加到bean*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("shawn.exchange");}/***声明一个Queue对象,并且添加到bean*/@Beanpublic Queue fanoutQueue1() {return new Queue("shawn.queue1");}/*** 将队列和Exchange进行绑定*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {// 使用 BindingBuilder 提供的方法进行绑定,最后返回Binding对象return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {return new Queue("shawn.queue2");}@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
 }
 

接受消息的监听器示例代码:

@Component
 public class SimpleMessageRabbitListener {@RabbitListener(queues = "shawn.queue1")public void listenWorkMessage1(String message) throws InterruptedException {System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(20);}@RabbitListener(queues = "shawn.queue2")public void listenWorkMessage2(String message) throws InterruptedException {System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(200);}
 }
 

在 Publisher 项目中编写消息发送的测试代码:

@Test
 void testSendExchange() throws InterruptedException {String exChangeName = "shawn.exchange";String message = "贾君鹏,你妈喊你回家吃饭";// 向指定名称的Exchage(交换机)发送消息rabbitTemplate.convertAndSend(exChangeName, "", message);
 }
 

测试结果:两个队列均能收到消息

在这里插入图片描述

3.DirectExchange

DirectExchange会将接受到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)。

在这里插入图片描述

当多个队列使用一个BindingKey时,DirectExchange会将消息发送给所有使用了这个BindingKey的队列

这种情况下,DirectExchang与FanoutExchange相同,也属于广播模式

因此可以认为 DirectExchange 可以模拟 FanoutExchang, 且比 FanoutExchange 灵活

示例:基于RabbitListener 实现 DirectExchange

编写 Consumer 项目的代码:

/**
 * 使用 RabbitListener 声明要绑定的队列、BindingKey、交换机和交换机类型,BindingKey可以设置多个
 */
 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "shawn.queue1"),exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
 ))
 public void listenWorkMessage1(String message) throws InterruptedException {System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(20);
 }@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "shawn.queue2"),exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
 ))
 public void listenWorkMessage2(String message) throws InterruptedException {System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(200);
 }
 

在 Publuisher 项目中编写测试代码:

@Test
 void testSendDirectExchange() throws InterruptedException {String exChangeName = "direct.exchange";String message = "贾君鹏,你妈喊你回家吃饭";String routingKey="blue";rabbitTemplate.convertAndSend(exChangeName,routingKey, message);
 }
 

此时,只有包含指定的 RoutingKey 的队列,才能收到消息。

4.TopicExchange

TopicExchange 与 DirectExchange 的区别在于,routingKey必须是多个单词的列表,并且使用英文句号( . )分割

在这里插入图片描述

Queue 与 Exchange 进行绑定时支持通配符:

示例:使用 TopicExchange 实现消息发送和接受

在 Consumer 项目中编写示例代码

@Component
 public class TopicExchangeListener {/*** listenWorkMessage1 接收和 china.# 有关的消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "shawn.queue1"),exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenWorkMessage1(String message) throws InterruptedException {System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(20);}/*** listenWorkMessage2 接收和 #.news 有关的消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "shawn.queue2"),exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenWorkMessage2(String message) throws InterruptedException {System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());Thread.sleep(200);}
 }
 

在 Publisher 项目中编写测试方法

@Test
 void testSendTopicExchange() throws InterruptedException {String exChangeName = "topic.exchange";String message = "贾君鹏,你妈喊你回家吃饭";// 此处的 routingKey 需要按照TopicExchange的规则编写String routingKey = "china.food";rabbitTemplate.convertAndSend(exChangeName, routingKey, message);
 }
 

定义的交换机和队列信息,均可以在 RabbitMQ Web UI 中看到

在这里插入图片描述

四 消息转换器

在SpringAMQP中,接受消息的类型时Object,也就是说,我们可以发送任何类型的对象给消费者,SpringAMQP会帮助我们进行序列化成字节后发送。

Spring中对消息对象的处理是由 SpringAMQP中的一个名为MessageConvert来处理的。默认实现是SimpleMessageConvert,基于JDK的ObjectOutputStream来完成。

通过重新定义一个 MessageConverter 的 Bean 来修改序列化方式。

推荐使用 JSON 方式序列化,消息体将会更加短小精悍,传输速度更快。

引入 jackson-dataformat-xml 依赖

com.fasterxml.jackson.dataformatjackson-dataformat-xml2.14.1
 
 

定义 Bean

@Configuration
 public class MessageConverterConfig {@Beanpublic Jackson2JsonMessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
 }
 

编写一个传输对象的简单队列测试示例

@Test
 void testSendObjectMessage() {String queueName = "object.queue";SystemLog log = new SystemLog();log.setAddress("陕西省西安市");log.setAge(29);log.setName("shawn");log.setId(1L);log.setPassword("123456");rabbitTemplate.convertAndSend("", queueName, log);
 }
 

此时消费者接受到的消息类型将会是Json格式,将Json信息转换对应的对象就可以拿到对象消息了。

在这里插入图片描述

注意:发送消息和接收消息时注意使用相同的 MessageConverter。可直接将消息转换为发送时的对象(自定义类型需手动转换)

完结撒花。


标签:

素材巴巴 Copyright © 2013-2021 http://www.sucaibaba.com/. Some Rights Reserved. 备案号:备案中。