素材巴巴 > 程序开发 >

java 连接rabbitmq_Java连接RabbitMQ实例

程序开发 2023-09-15 22:00:20

本节演示如何使用Java来连接RabbitMQ,以此来生产和消费消息。生产者发送一条消息“Hello World!”给RabbitMQ服务器,消费者拿到之后进行消费。

1.1 maven依赖

com.rabbitmq

amqp-client

5.7.1

1.2 生产者代码

package com.hys.rabbitmq;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.MessageProperties;

public class RabbitProducer {

private static final String EXCHANGE_NAME = "exchange_demo";

private static final String ROUTING_KEY = "routingkey_demo";

private static final String QUEUE_NAME = "queue_demo";

private static final String IP_ADDRESS = "127.0.0.1";

private static final int PORT = 5672;

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(IP_ADDRESS);

factory.setPort(PORT);

factory.setUsername("root");

factory.setPassword("root");

//创建连接

Connection connection = factory.newConnection();

//创建信道

Channel channel = connection.createChannel();

//创建一个type="direct"、持久化的、非自动删除的交换器

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

//创建一个持久化、非排他的、非自动删除的队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

//将交换器与队列通过路由键绑定

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

//发送一条持久化的消息:hello world!

String message = "Hello World!";

channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

//关闭资源

channel.close();

connection.close();

}

}

运行上述代码后,打开RabbitMQ的管理界面,可以看到消息已经存在RabbitMQ里了:

1.3 消费者代码

package com.hys.rabbitmq;

import java.io.IOException;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Address;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

public class RabbitConsumer {

private static final String QUEUE_NAME = "queue_demo";

private static final String IP_ADDRESS = "127.0.0.1";

private static final int PORT = 5672;

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

Address[] addresses = new Address[] { new Address(IP_ADDRESS, PORT) };

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("root");

factory.setPassword("root");

//这里的连接方式与生产者的demo略有不同,注意辨别区别

//创建连接

Connection connection = factory.newConnection(addresses);

//创建信道

final Channel channel = connection.createChannel();

//设置客户端最多接受未被ack的消息的个数

channel.basicQos(64);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

System.out.println("recv message:" + new String(body));

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

channel.basicAck(envelope.getDeliveryTag(), false);

}

};

channel.basicConsume(QUEUE_NAME, consumer);

//等待回调函数执行完毕之后,关闭资源

TimeUnit.SECONDS.sleep(5);

channel.close();

connection.close();

}

}

运行结果:

recv message:Hello World!

再次查看RabbitMQ管理界面,可以看到此时消息已经变为了0:

2 Spring Boot连接

2.1 maven依赖

org.springframework.boot

spring-boot-starter-amqp

2.2 application.properties文件

#RabbitMQ配置

spring.rabbitmq.addresses=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=root

spring.rabbitmq.password=root

spring.rabbitmq.virtual-host=/

spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true

spring.rabbitmq.listener.simple.acknowledge-mode=manual

2.3 配置类代码

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.env.Environment;

/**

* RabbitMQ配置类

* @author Robert Hou

* @date 2019年7月1日

*/

@Configuration

public class RabbitMQConfig {

/**

* EXCHANGE名称

*/

public static final String FANOUT_EXCHANGE = "test.fanout";

public static final String DIRECT_EXCHANGE = "test.direct";

public static final String TOPIC_EXCHANGE = "test.topic";

/**

* QUEUE名称

*/

public static final String FANOUT_QUEUE = "test.fanout.queue";

public static final String DIRECT_QUEUE = "test.direct.queue";

public static final String TOPIC_QUEUE = "test.topic.queue";

/**

* ROUTINGKEY名称

*/

public static final String DIRECT_ROUTINGKEY = "direct";

public static final String TOPIC_ROUTINGKEY = "topic.#";

@Bean

public ConnectionFactory connectionFactory(Environment environment) {

String addresses = environment.getProperty("spring.rabbitmq.addresses");

int port = environment.getProperty("spring.rabbitmq.port", Integer.class);

String username = environment.getProperty("spring.rabbitmq.username");

String password = environment.getProperty("spring.rabbitmq.password");

String virtualHost = environment.getProperty("spring.rabbitmq.virtual-host");

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(addresses, port);

connectionFactory.setUsername(username);

connectionFactory.setPassword(password);

connectionFactory.setVirtualHost(virtualHost);

connectionFactory.setPublisherConfirms(true);

return connectionFactory;

}

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange(FANOUT_EXCHANGE, true, false);

}

@Bean

public DirectExchange directExchange() {

return new DirectExchange(DIRECT_EXCHANGE, true, false);

}

@Bean

public TopicExchange topicExchange() {

return new TopicExchange(TOPIC_EXCHANGE, true, false);

}

@Bean

public Queue fanoutQueue() {

return new Queue(FANOUT_QUEUE, true);

}

@Bean

public Queue directQueue() {

return new Queue(DIRECT_QUEUE, true);

}

@Bean

public Queue topicQueue() {

return new Queue(TOPIC_QUEUE, true);

}

@Bean

public Binding fanoutBinding() {

return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());

}

@Bean

public Binding directBinding() {

return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);

}

@Bean

public Binding topicBinding() {

return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_ROUTINGKEY);

}

}

2.4 生产者代码

import java.util.Map;

import java.util.UUID;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageHeaders;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Service;

import com.hys.springboot.entity.Order;

import com.hys.springboot.service.IRabbitSenderService;

@Service

public class RabbitSenderServiceImpl implements IRabbitSenderService {

private static final Log logger = LogFactory.getLog(RabbitSenderServiceImpl.class);

@Autowired

private RabbitTemplate rabbitTemplate;

final ConfirmCallback CONFIRM_CALLBACK = (correlationData, ack, cause) -> {

if (logger.isDebugEnabled()) {

logger.debug("correlationData:" + correlationData + " ack:" + ack);

}

if (!ack) {

if (logger.isErrorEnabled()) {

logger.error("异常处理");

}

}

};

final ReturnCallback RETURN_CALLBACK = (message, replyCode, replyText, exchange, routingKey) -> {

if (logger.isErrorEnabled()) {

logger.error("replyCode:" + replyCode + " replyText:" + replyText + " exchange:" + exchange + " routingKey:" + routingKey);

}

};

@Override

public void send(String exchange, String routingKey, Object message, Map properties) {

MessageHeaders messageHeaders = new MessageHeaders(properties);

Message msg = MessageBuilder.createMessage(message, messageHeaders);

rabbitTemplate.setConfirmCallback(CONFIRM_CALLBACK);

rabbitTemplate.setReturnCallback(RETURN_CALLBACK);

CorrelationData correlationData = new CorrelationData();

correlationData.setId(UUID.randomUUID().toString() + "-" + System.currentTimeMillis());

rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);

}

@Override

public void sendOrder(String exchange, String routingKey, Order order) {

rabbitTemplate.setConfirmCallback(CONFIRM_CALLBACK);

rabbitTemplate.setReturnCallback(RETURN_CALLBACK);

CorrelationData correlationData = new CorrelationData();

correlationData.setId(UUID.randomUUID().toString() + "-" + System.currentTimeMillis());

rabbitTemplate.convertAndSend(exchange, routingKey, order, correlationData);

}

}

第一个方法发送到的是message,而第二个方法发送的则是自定义的Java Bean:Order。

测试的发送代码如下所示:

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import com.hys.springboot.config.RabbitMQConfig;

import com.hys.springboot.entity.Order;

import com.hys.springboot.service.IRabbitSenderService;

@RunWith(SpringRunner.class)

@SpringBootTest

public class ApplicationTests {

@Autowired

private IRabbitSenderService rabbitSenderService;

@Test

public void testSender1() {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

Map properties = new HashMap<>();

properties.put("number", "12345");

properties.put("send_time", sdf.format(new Date()));

rabbitSenderService.send(RabbitMQConfig.TOPIC_EXCHANGE, "topic.user", "Hello World", properties);

}

@Test

public void testSender2() {

Order order = new Order();

order.setId("001");

order.setName("订单一");

rabbitSenderService.sendOrder(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTINGKEY, order);

}

}

2.5 消费者代码

import java.io.IOException;

import java.util.Map;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.messaging.Message;

import org.springframework.messaging.handler.annotation.Headers;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Service;

import com.hys.springboot.config.RabbitMQConfig;

import com.hys.springboot.entity.Order;

import com.hys.springboot.service.IRabbitReceiverService;

import com.rabbitmq.client.Channel;

@Service

public class RabbitReceiverServiceImpl implements IRabbitReceiverService {

private static final Log logger = LogFactory.getLog(RabbitReceiverServiceImpl.class);

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitMQConfig.TOPIC_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitMQConfig.TOPIC_EXCHANGE, durable = "true", type = ExchangeTypes.TOPIC), key = RabbitMQConfig.TOPIC_ROUTINGKEY))

@RabbitHandler

@Override

public void receiveTopicMessage(Message message, Channel channel) throws IOException {

if (logger.isDebugEnabled()) {

logger.debug("消费端Payload:" + message.getPayload());

}

Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

//手工ack

channel.basicAck(deliveryTag, false);

}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitMQConfig.DIRECT_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitMQConfig.DIRECT_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT), key = RabbitMQConfig.DIRECT_ROUTINGKEY))

@RabbitHandler

@Override

public void receiveOrderTopicMessage(@Payload Order order, Channel channel, @Headers Map headers) throws IOException {

if (logger.isDebugEnabled()) {

logger.debug("消费端Payload:" + order);

}

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

//手工ack

channel.basicAck(deliveryTag, false);

}

}

0b1331709591d260c1c78e86d0c51c18.png


标签:

上一篇: Java中生成指定长度验证码 下一篇:
素材巴巴 Copyright © 2013-2021 http://www.sucaibaba.com/. Some Rights Reserved. 备案号:备案中。