SpringBoot实现多rabbitmq连接
程序开发
2023-09-15 07:58:22
一、配置
1. 配置文件
rabbitmq:first:host: port: username: password: #虚拟host 可以不设置,使用server默认hostvirtual-host: /second:host: port: username: password: virtual-host: /
2. 配置类
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** RabbitMq多源配置** @author lq*/
@Configuration
public class RabbitConfig {@Bean(name = "firstConnectionFactory")@Primarypublic ConnectionFactory firstConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,@Value("${spring.rabbitmq.first.port}") int port,@Value("${spring.rabbitmq.first.username}") String username,@Value("${spring.rabbitmq.first.password}") String password,@Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Bean(name = "secondConnectionFactory")public ConnectionFactory secondConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,@Value("${spring.rabbitmq.second.port}") int port,@Value("${spring.rabbitmq.second.username}") String username,@Value("${spring.rabbitmq.second.password}") String password,@Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Bean(name = "firstRabbitTemplate")@Primarypublic RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);return firstRabbitTemplate;}@Bean(name = "secondRabbitTemplate")public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);return secondRabbitTemplate;}@Bean(name = "firstFactory")public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "secondFactory")public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}
}
3.信道构建器
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;/*** 信道构建器** @author liuqi*/
@Configuration
public class CreateQueue {@Beanpublic String chargeQueue(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {try {connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME, true, false, false, null);}catch (IOException e){e.printStackTrace();}return Constants.RABBITMQ_QUEUE_NAME;}@Beanpublic String chargeQueue2(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {try {connectionFactory.createConnection().createChannel(false).queueDeclare(Constants.RABBITMQ_QUEUE_NAME2, true, false, false, null);}catch (IOException e){e.printStackTrace();}return Constants.RABBITMQ_QUEUE_NAME2;}
}
二、发送
1. 创建发送类
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.zlhy.websocket.util.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.bcel.Const;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Map;/*** @author liuqi* @version 1.0* @description 向关联方的队列发送消息*/
@Slf4j
@Service
public class SendMessage {@Resource(name = "firstRabbitTemplate")private RabbitTemplate firstRabbitTemplate;@Resource(name = "secondRabbitTemplate")private RabbitTemplate secondRabbitTemplate;public void sendToOneMessage(JSONObject jsonObject) {MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("application/json");Message info = new Message(jsonObject.toString().getBytes(), messageProperties);firstRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME, info);}public void sendToTwoMessage(JSONObject jsonObject) {MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("application/json");Message info = new Message(jsonObject.toString().getBytes(), messageProperties);secondRabbitTemplate.convertAndSend(Constants.RABBITMQ_QUEUE_NAME2, info);}
}
2. 调用方法发送数据
sendMessage.sendToOneMessage(jsonResult);
sendMessage.sendToTwoMessage(jsonResult);
三、消费测试
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.nio.charset.StandardCharsets;@Slf4j
@Service
public class OneReceive {@RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME, containerFactory = "firstFactory")public void listenOne(Message message, Channel channel) throws IOException {//获取MQ返回的数据// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);String data = new String(message.getBody(), StandardCharsets.UTF_8);log.info("MQ1返回的数据:{}", data);//下面进行业务逻辑处理}
}
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.nio.charset.StandardCharsets;@Slf4j
@Service
public class TwoReceive {@RabbitListener(queues = Constants.RABBITMQ_QUEUE_NAME2, containerFactory = "secondFactory")public void listenTwo(Message message, Channel channel) throws IOException {//获取MQ返回的数据//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);String data = new String(message.getBody(), StandardCharsets.UTF_8);log.info("MQ2返回的数据:{}", data);//下面进行业务逻辑处理}}
也可放到一个类中测试消费
标签:
上一篇:
将angular-cli项目部署到github pages上,在线预览
下一篇:
相关文章
-
无相关信息