素材巴巴 > 程序开发 >

SpringBoot实现多rabbitmq连接

程序开发 2023-09-15 07:58:22

一、配置

1. 配置文件

  rabbitmq:first:host: port: username: password: #虚拟host 可以不设置,使用server默认hostvirtual-host: /second:host: port: username: password: virtual-host: /

2. 配置类

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;/*** RabbitMq多源配置** @author lq*/
 @Configuration
 public class RabbitConfig {@Bean(name = "firstConnectionFactory")@Primarypublic ConnectionFactory firstConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,@Value("${spring.rabbitmq.first.port}") int port,@Value("${spring.rabbitmq.first.username}") String username,@Value("${spring.rabbitmq.first.password}") String password,@Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Bean(name = "secondConnectionFactory")public ConnectionFactory secondConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,@Value("${spring.rabbitmq.second.port}") int port,@Value("${spring.rabbitmq.second.username}") String username,@Value("${spring.rabbitmq.second.password}") String password,@Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Bean(name = "firstRabbitTemplate")@Primarypublic RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);return firstRabbitTemplate;}@Bean(name = "secondRabbitTemplate")public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);return secondRabbitTemplate;}@Bean(name = "firstFactory")public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "secondFactory")public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}
 }

 3.信道构建器

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import java.io.IOException;/*** 信道构建器** @author liuqi*/
 @Configuration
 public class CreateQueue {@Beanpublic String chargeQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {try {connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME, true, false, false, null);}catch (IOException e){e.printStackTrace();}return Constants.RABBITMQ_QUEUE_NAME;}@Beanpublic String chargeQueue2(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {try {connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME2, true, false, false, null);}catch (IOException e){e.printStackTrace();}return Constants.RABBITMQ_QUEUE_NAME2;}
 }

二、发送

1. 创建发送类

import com.alibaba.fastjson.JSONObject;
 import com.google.gson.JsonObject;
 import com.zlhy.websocket.util.constant.Constants;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.tomcat.util.bcel.Const;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Service;import javax.annotation.Resource;
 import java.util.Map;/*** @author liuqi* @version 1.0* @description 向关联方的队列发送消息*/
 @Slf4j
 @Service
 public class SendMessage {@Resource(name = "firstRabbitTemplate")private RabbitTemplate firstRabbitTemplate;@Resource(name = "secondRabbitTemplate")private RabbitTemplate secondRabbitTemplate;public void sendToOneMessage(JSONObject jsonObject) {MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("application/json");Message info = new Message(jsonObject.toString().getBytes(), messageProperties);firstRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME, info);}public void sendToTwoMessage(JSONObject jsonObject) {MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("application/json");Message info = new Message(jsonObject.toString().getBytes(), messageProperties);secondRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME2, info);}
 }

2. 调用方法发送数据

sendMessage.sendToOneMessage(jsonResult);

sendMessage.sendToTwoMessage(jsonResult);

三、消费测试

import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Service;import java.io.IOException;
 import java.nio.charset.StandardCharsets;@Slf4j
 @Service
 public class OneReceive {@RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME, containerFactory = "firstFactory")public void listenOne(Message message, Channel channel) throws IOException {//获取MQ返回的数据// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);String data = new String(message.getBody(), StandardCharsets.UTF_8);log.info("MQ1返回的数据:{}", data);//下面进行业务逻辑处理}
 }
import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Service;import java.io.IOException;
 import java.nio.charset.StandardCharsets;@Slf4j
 @Service
 public class TwoReceive {@RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME2, containerFactory = "secondFactory")public void listenTwo(Message message, Channel channel) throws IOException {//获取MQ返回的数据//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);String data = new String(message.getBody(), StandardCharsets.UTF_8);log.info("MQ2返回的数据:{}", data);//下面进行业务逻辑处理}}

也可放到一个类中测试消费


标签:

素材巴巴 Copyright © 2013-2021 http://www.sucaibaba.com/. Some Rights Reserved. 备案号:备案中。