这是和RabbitMQ(基本使用)文章 一起使用
18、RabbitMQ和Spring的整合
1、 重要的类
RabbitAdmin可以很好的处理交换机和队列,修改,删除并且可以注解到spring里
RabbitTemplate类负责收发消息
2、创建SpringBoot工程
<!-- 添加rabbitmq的基本依赖 -->
<dependency>
<groupid>com.rabbitmq</groupid>
<artifactid>amqp-client<artifactid>
</dependency>
<!-- Spring对rabbitmq支持的依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<!-- SpringBoot对rabbitmq支持的依赖 -->
<dependency>
<groupid>com.springframework.boot<groupid>
<artifactid>spring-boot-starter-amqp<artifactid>
</dependency>
版本使用boot默认
3、创建一个配置类
package com.study.rabbitmq;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 武帅
* @date 2020/5/31 10:04
* @description
*/
//等价于配置文件(***.xml)
@Configuration
public class ConfigurationRabbitmq {
//连接工厂
//ConnectionFactory 注意不再是rabbit里的了,而是spring里面的
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("localhost:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
//管理交换机和队列的RabbitAdmin
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
}
4、RabbitAdmin声明队列交换机并进行绑定
package com.study.rabbitmq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
void test测试产生队列交换机进行绑定(){
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchange = new DirectExchange("交换机spring短信",true,false,null);
//创建交换机
rabbitAdmin.declareExchange(exchange);
//需要导入这个类 import org.springframework.amqp.core.Queue;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
//第四个参数 是否自动删除
Queue queue = new Queue("队列spring短信",true,false,false);
//创建队列
rabbitAdmin.declareQueue(queue);
//需要引入这个包 import org.springframework.amqp.core.Binding;
//第一个参数 需要绑定的队列名字
//第二个参数 绑定的类型 (是一个枚举)
//第三个参数 需要绑定的交换机名字
//第四个参数 路由键 (这里是直连型交换机。路由键必须保持一致)
//第五个参数 携带的头信息 这里暂时没有
Binding binding = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信","发送短信",null);
//进行绑定
rabbitAdmin.declareBinding(binding);
}
}
5、RabbitAdmin创建四种类型的交换机并绑定到一个队列
代码如下:
package com.study.rabbitmq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
void test测试产生队列交换机进行绑定(){
//创建直连型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeDirect = new DirectExchange("交换机spring短信",true,false,null);
//把上面的值进行入参
rabbitAdmin.declareExchange(exchangeDirect);
//创建fanout型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",true,false,null);
//把上面的值进行入参
rabbitAdmin.declareExchange(exchangeFanout);
//创建topic型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeTopicStar = new TopicExchange("交换机spring短信-topic",true,false,null);
//把上面的值进行入参
rabbitAdmin.declareExchange(exchangeTopicStar);
//创建header型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",true,false,null);
//把上面的值进行入参
rabbitAdmin.declareExchange(exchangeHeader);
//创建队列 这里就创建了一个队列和4个交换机进行绑定
//需要导入这个类 import org.springframework.amqp.core.Queue;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
//第四个参数 是否自动删除
Queue queue = new Queue("队列spring短信",true,false,false);
//把上面的值传递过来
rabbitAdmin.declareQueue(queue);
//使用直连型交换机和队列进行绑定
//需要引入这个包 import org.springframework.amqp.core.Binding;
//第一个参数 需要绑定的队列名字
//第二个参数 绑定的类型 (是一个枚举)
//第三个参数 需要绑定的交换机名字
//第四个参数 路由键 (这里是直连型交换机。路由键必须保持一致)
//第五个参数 携带的头信息 这里暂时没有
Binding binding = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信","发送短信",null);
//把上面的参数传递过来
rabbitAdmin.declareBinding(binding);
//使用fanout型交换机和队列进行绑定
//需要引入这个包 import org.springframework.amqp.core.Binding;
//第一个参数 需要绑定的队列名字
//第二个参数 绑定的类型 (是一个枚举)
//第三个参数 需要绑定的交换机名字
//第四个参数 路由键 (这里是fanout型交换机。没有路由键也可以)
//第五个参数 携带的头信息 这里暂时没有
Binding bindingFanout = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-fanout","",null);
//把上面的参数传递过来
rabbitAdmin.declareBinding(bindingFanout);
//使用topic型交换机和队列进行绑定
//需要引入这个包 import org.springframework.amqp.core.Binding;
//第一个参数 需要绑定的队列名字
//第二个参数 绑定的类型 (是一个枚举)
//第三个参数 需要绑定的交换机名字
//第四个参数 路由键 (这里是topic型交换机)
// .*代表没有,或者一个(不能多个)
//第五个参数 携带的头信息 这里暂时没有
Binding bindingTopic = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-topic","0086.*",null);
//把上面的参数传递过来
rabbitAdmin.declareBinding(bindingTopic);
//header型交换机的参数 这里的all表示除去第一个参数,其余的都得有和正确才行, any表示有一个就可以
Map<String,Object> map = new HashMap<>();
map.put("x-match","all");
map.put("userName","华为");
map.put("passWord","123456");
//使用header型交换机和队列进行绑定
//需要引入这个包 import org.springframework.amqp.core.Binding;
//第一个参数 需要绑定的队列名字
//第二个参数 绑定的类型 (是一个枚举)
//第三个参数 需要绑定的交换机名字
//第四个参数 路由键 (这里是header型交换机。没有路由键也可以)
//第五个参数 携带的头信息 这里暂时没有
Binding bindingHeader = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-header","",map);
//把上面的参数传递过来
rabbitAdmin.declareBinding(bindingHeader);
}
}
6、RabbitAdmin清空和删除队列API
代码如下:
//测试删除队列和清除队列消息 Purge : 清除
@Test
void testDeleteQueuesAndPurgeQueueMessage(){
//清除队列消息 只有这个队列存在和队列名字正确,才可以清除,系统不报错
rabbitAdmin.purgeQueue("队列topic*");
//删除队列
rabbitAdmin.deleteQueue("队列spring短信");
//删除交换机
rabbitAdmin.deleteExchange("交换机spring短信");
rabbitAdmin.deleteExchange("交换机spring短信-fanout");
rabbitAdmin.deleteExchange("交换机spring短信-topic");
rabbitAdmin.deleteExchange("交换机spring短信-header");
}
7、使用Bean注解创建交换机和队列绑定
也就是使用Spring创建交换机和队列
代码如下:
@Configuration
public class ConfigurationRabbitmq //这个类里多出来的代码
//使用Bean注解创建队列 也就是使用Spring创建队列
@Bean
public Queue queueSMS(){
//创建队列
//需要导入这个类 import org.springframework.amqp.core.Queue;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
//第四个参数 是否自动删除
Queue queue = new Queue("队列spring短信",true,false,false);
return queue;
}
//使用Bean注解创建直连型交换机 也就是使用Spring创建直连型交换机
//如果嫌这种入参方式麻烦,也可以使用链式编程,连续...的方式
//queueSMS()表示队列的名字
//exchangeDirect()表示交换机的名字
//"发送短信" 表示 路由键
//and(null); null 表示附加的值
//Binding binding = BindingBuilder.bind(queueSMS()).to(exchangeDirect()).with("发送短信").and(null);
@Bean
public Exchange exchangeDirect(){
//创建直连型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeDirect = new DirectExchange("交换机spring短信",true,false,null);
return exchangeDirect;
}
//使用Bean注解创建fanout交换机 也就是使用Spring创建fanout交换机
@Bean
public Exchange exchangeFanout(){
//创建fanout型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",true,false,null);
return exchangeFanout;
}
//使用Bean注解创建topic交换机 也就是使用Spring创建topic交换机
@Bean
public Exchange exchangeTopicStar(){
//创建topic型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeTopicStar = new TopicExchange("交换机spring短信-topic",true,false,null);
return exchangeTopicStar;
}
//使用Bean注解创建header交换机 也就是使用Spring创建header交换机
@Bean
public Exchange exchangeHeader(){
//创建header型交换机
//需要导入这个类 import org.springframework.amqp.core.Exchange;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否自动删除
//第四个参数 携带的头信息 这里暂时没有
Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",true,false,null);
return exchangeHeader;
}
//使用Bean注解绑定直连型交换机
@Bean
public Binding bindingDirect(){
Binding binding = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信","发送短信",null);
return binding;
}
//使用Bean注解绑定fanout型交换机
@Bean
public Binding bindingFanout(){
Binding bindingFanout = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-fanout","",null);
return bindingFanout;
}
//使用Bean注解绑定topic型交换机
@Bean
public Binding bindingTopic(){
Binding bindingTopic = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-topic","0086.*",null);
return bindingTopic;
}
//使用Bean注解绑定header型交换机
@Bean
public Binding bindingHeader(){
//header型交换机的参数 这里的all表示除去第一个参数,其余的都得有和正确才行, any表示有一个就可以
Map<String,Object> map = new HashMap<>();
map.put("x-match","all");
map.put("userName","华为");
map.put("passWord","123456");
Binding bindingHeader = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-header","",map);
return bindingHeader;
}
测试类里的代码:
@Autowired
private Binding bindingDirect;
@Autowired
private Binding bindingFanout;
@Autowired
private Binding bindingTopic;
@Autowired
private Binding bindingHeader;
//使用Bean注解创建交换机和队列绑定 也就是使用Spring创建交换机和队列
@Test
void testDefineQueueExchangeBindingBySpring(){
rabbitAdmin.declareBinding(bindingDirect);
rabbitAdmin.declareBinding(bindingFanout);
rabbitAdmin.declareBinding(bindingTopic);
rabbitAdmin.declareBinding(bindingHeader);
}
8、RabbitTemplate使用convertAndSend方法发送消息
配置类的代码:
//创建一个可以收发消息的模板 (这个平时用的比较多)
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
测试类代码:
@SpringBootTest
class TestRabbitmqTemplate {
@Autowired
private RabbitTemplate rabbitTemplate;
//使用RabbitMq模板给交换机发送消息 使用convertAndSend()方法
@Test
void TestSimpleSend(){
//第一个参数 交换机的名字
//第二个参数 路由键 (因为是直连型交换机,所以路由键必须一致)
//第三个参数 发送的具体信息 (这里不用使用 .getBytes()方法了)
//也可以使用第4个参数 携带的头信息
String sendSMS = "直连型交换机发送短信";
rabbitTemplate.convertAndSend("交换机spring短信", "发送短信", sendSMS, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("登录次数",2);
return message;
}
});
}
}
9、RabbitTemplate使用Send方法发送消息
代码如下:
//使用RabbitMq模板给交换机发送消息 使用send()方法
@Test
void testSend(){
String sendSMS = "使用send()方法来发送短信";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
properties.getHeaders().put("登录次数",3);
Message message = new Message(sendSMS.getBytes(),properties);
rabbitTemplate.send("交换机spring短信-fanout","",message);
}
//使用RabbitMq模板给交换机发送消息 使用send()方法
@Test
void testSendTopicAndHeaders(){
//给Topic交换机上发送消息
String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
properties.getHeaders().put("登录次数",4);
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);
//给Header交换机上发送消息
String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
propertiesHeader.getHeaders().put("userName","华为");
propertiesHeader.getHeaders().put("passWord","123456");
propertiesHeader.getHeaders().put("登录次数",5);
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
}
10、RabbitTemplate使用Receive方法接收消息
package com.study.rabbitmq.p02TestRabbitmqTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Map;
/**
* @author 武帅
* @date 2020/6/5 09:26
* @description
*/
@SpringBootTest
class TestRabbitmqTemplateReceive {
@Autowired
private RabbitTemplate rabbitTemplate;
//使用RabbitMq模板接收消息
@Test
void testReceive(){
//一般不怎么使用while循环true 这里只是为了测试用下
while (true){
//第一个参数 交换机的名字
//第二个参数 毫秒数 如果2秒中内,没有消息就退出这个消息
Message message = rabbitTemplate.receive("队列spring短信", 2000);
if(message == null){
break;
}
String str = new String(message.getBody());
System.out.println("收到了" + str);
//获取头信息
Map<String, Object> headers = message.getMessageProperties().getHeaders();
//遍历头信息
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println(entry);
}
}
}
}
11、SimpleMessageListenerContainer基本属性和标识策略, 使用MessageListener接收并处理消息
要在SpringBoot下启动,如果在测试单元下,则会出现问题
package com.study.rabbitmq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* @author 武帅
* @date 2020/6/5 11:14
* @description 消费者的配置文件
*/
@Configuration
public class ConfigurationConsumer {
//使用Bean注解创建队列 也就是使用Spring创建队列
@Bean
public Queue queueXiaoMi(){
//创建队列
//需要导入这个类 import org.springframework.amqp.core.Queue;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
//第四个参数 是否自动删除
Queue queue = new Queue("队列spring-监听器-小米",true,false,false);
return queue;
}
//使用Bean注解创建队列 也就是使用Spring创建队列
@Bean
public Queue queueHongMi(){
//创建队列
//需要导入这个类 import org.springframework.amqp.core.Queue;
//第一个参数 名字(最好是英文)见名知意
//第二个参数 是否要持久化到本地磁盘中
//第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
//第四个参数 是否自动删除
Queue queue = new Queue("队列spring-监听器-红米",true,false,false);
return queue;
}
private Integer num = 0;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO表示为自动模式
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//设置消息监听器
simpleMessageListenerContainer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//获取出消息,在打印
String oneData = new String(message.getBody());
System.out.println( "收到了消息:" + oneData);
//获取信息的自定义内容
Map<String, Object> headers = message.getMessageProperties().getHeaders();
//遍历获取信息的自定义内容
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println(entry);
}
}
});
return simpleMessageListenerContainer;
}
}
使用生产者生产数据,测试数据
//使用RabbitMq模板给交换机发送消息 使用send()方法
@Test
void testSendTopicAndHeaders(){
//给Topic交换机上发送消息
String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
properties.getHeaders().put("登录次数",4);
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);
//给Header交换机上发送消息
String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
propertiesHeader.getHeaders().put("userName","华为");
propertiesHeader.getHeaders().put("passWord","123456");
propertiesHeader.getHeaders().put("登录次数",5);
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
}
12、使用ChannelAwareMessageListener接收消息-手动确认消息
//测试手动接受消息确认
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.MANUAL表示手动接收消息
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//设置消息监听器
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取出消息,在打印
String oneData = new String(message.getBody());
System.out.println( "收到了消息:" + oneData);
//获取信息的自定义内容
Map<String, Object> headers = message.getMessageProperties().getHeaders();
//遍历获取信息的自定义内容
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println(entry);
}
//对消息进行了手动的确认 第二个参数表示:是否批量(这里我们是一条一条处理,所以不用批量)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//也可以使用这种方法 nack增加了重回队列的参数,最后一个false代表不用重回
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
});
return simpleMessageListenerContainer;
}
使用生产者生产数据,测试数据
//使用RabbitMq模板给交换机发送消息 使用send()方法
@Test
void testSendTopicAndHeaders(){
//给Topic交换机上发送消息
String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
properties.getHeaders().put("登录次数",4);
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);
//给Header交换机上发送消息
String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
propertiesHeader.getHeaders().put("userName","华为");
propertiesHeader.getHeaders().put("passWord","123456");
propertiesHeader.getHeaders().put("登录次数",5);
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
}
13、 使用MessageListenerAdapter类进行消息接收并处理 消息监听适配器(在实际中运用的比较多)
使用MessageListenerAdapter处理器进行消息队列监听处理,如果容器没有设置setDefaultListenerMethod,则处理器中默认的处理方法名是handleMessage,如果设置了setDefaultListenerMethod,则处理器中处理消息的方法名就是setDefaultListenerMethod方法参数设置的值。也可以通过setQueueOrTagToMethodName方法为不同的队列设置不同的消息处理方法
自定义消息处理类,注意方法名和参数类型必须要一致
首先自己先定义一个MessageDelegate
package com.study.rabbitmq.p10TestSpringListenerAdaper;
/**
* @author 武帅
* @date 2020/6/7 10:55
* @description
*/
public class MyMessageDelegate {
public void handleMessage(byte[] body){
String strMessage = new String(body);
System.out.println("我收到了:" + strMessage);
}
}
然后再自己写监听适配器对消息进行接受和消费
//使用监听适配器对消息进行接受和消费
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
发送数据
//使用RabbitMq模板给交换机发送消息 使用send()方法
@Test
void testSendTopicAndHeaders(){
//给Topic交换机上发送消息
String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
properties.getHeaders().put("登录次数",4);
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);
//给Header交换机上发送消息
String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
propertiesHeader.getHeaders().put("userName","华为");
propertiesHeader.getHeaders().put("passWord","123456");
propertiesHeader.getHeaders().put("登录次数",5);
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
}
==注意:这里和上面的使用方法为:必须是先使用测试单元,添加数据(添加之前先注释自己写好的@Bean),然后再解除注释,然后再次启动SpringBoot==
14、MessageListenerAdapter自定义方法接收消息-绑定队列和处理方法
自定义方法接收消息的代码:
//自定义的处理消息的方法名称
public void myHandleMessage(byte[] body){
String strMessage = new String(body);
System.out.println("自定义名称的方法 我收到了:" + strMessage);
}
配置类的代码
//使用监听适配器对消息进行接受和消费
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//使用自己定义的MessageDelegate
messageListenerAdapter.setDefaultListenerMethod("myHandleMessage");
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
生产者生产的代码
//使用RabbitMq模板给交换机发送消息 使用send()方法
@Test
void testSendTopicAndHeaders(){
//给Topic交换机上发送消息
String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
properties.getHeaders().put("登录次数",4);
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);
//给Header交换机上发送消息
String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
propertiesHeader.getHeaders().put("userName","华为");
propertiesHeader.getHeaders().put("passWord","123456");
propertiesHeader.getHeaders().put("登录次数",5);
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
}
绑定队列和处理方法de的代码:
//专用于处理消息队列-小米手机的方法
public void myHandleMessageXiaoMi(byte[] body){
String strMessage = new String(body);
System.out.println("自定义名称的方法-小米 我收到了:" + strMessage);
}
//专用于处理消息队列-红米手机的方法
public void myHandleMessageHongMi(byte[] body){
String strMessage = new String(body);
System.out.println("自定义名称的方法-红米 我收到了:" + strMessage);
}
配置类代码
//使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//这个Map记录了哪个队列使用哪个方法 key为队列的名字 value为方法的名字
Map<String,String> queueMethod = new HashMap<>();
queueMethod.put("队列spring-监听器-小米","myHandleMessageXiaoMi");
queueMethod.put("队列spring-监听器-红米","myHandleMessageHongMi");
//通过队列的名字来和方法的名字进行绑定 (此处使用Map集合)
messageListenerAdapter.setQueueOrTagToMethodName(queueMethod);
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
生产者的代码:
@Test
void testSendToXiaoMiAndHongMi(){
//给Topic交换机上发送消息
String sendSMSTopic = "一部小米手机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("","队列spring-监听器-小米",message);
//给Header交换机上发送消息
String sendSMSHeader = "一部红米手机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("","队列spring-监听器-红米",messageHeaders);
}
15、 MessageConverter自定义接收数据类型
MessageConverter自定义接收数据类型的代码:
package com.study.rabbitmq.p10TestSpringListenerAdaper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* @author 武帅
* @date 2020/6/7 16:18
* @description 自定义文本转换器
* MessageConverter自定义接收数据类型
*
* 类型转换器(实现接口MessageConverter)
*/
public class MyMessageTestConverter implements MessageConverter {
// java对象转换为Message对象
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
Message message = new Message(o.toString().getBytes(),messageProperties);
return message;
}
// Message对象转换为java对象
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(contentType != null && contentType.contains("text")){
String str = new String(message.getBody());
return str;
}else{
//对于不是文本的类型,直接返回字节数组
return message.getBody();
}
}
}
配置类代码:
//使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//设置消息的转换器
messageListenerAdapter.setMessageConverter(new MyMessageTestConverter());
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//使用自己的类型转换器
messageListenerAdapter.setDefaultListenerMethod("myHandleMessage");
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
处理方法的参数为转换过的字符串类型的代码:
// 处理方法的参数为转换过的字符串类型
public void myHandleMessage(String str){
System.out.println( "处理方法的参数为转换过的字符串类型" + str);
}
生产者的代码:
@Test
void testSendToXiaoMiAndHongMi(){
//给Topic交换机上发送消息
String sendSMSTopic = "一部小米手机";
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
Message message = new Message(sendSMSTopic.getBytes(),properties);
rabbitTemplate.send("","队列spring-监听器-小米",message);
//给Header交换机上发送消息
String sendSMSHeader = "一部红米手机";
MessageProperties propertiesHeader = new MessageProperties();
propertiesHeader.setContentEncoding("UTF-8");
Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);
rabbitTemplate.send("","队列spring-监听器-红米",messageHeaders);
}
16、添加json支持-转换普通对象为JSON字符串并发送到MQ
在pom.xml中需要添加jackson的引用,才能够实现把自定义的对象转换为json字符串
<!--json支持的依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
实体类代码:
package com.study.rabbitmq.p10TestSpringListenerAdaper;
/**
* @author 武帅
* @date 2020/6/7 16:56
* @description
*/
public class MyOrder {
private Integer id;
private String name;
private float price;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
@Override
public String toString() {
return "MyOrder{" +
"id=" + id +
", name='" + name + '\'' +
", price=" + price +
'}';
}
}
配置类代码(暂时先注释一下)
//使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//设置消息的转换器
messageListenerAdapter.setMessageConverter(new MyMessageTestConverter());
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//使用自己的类型转换器
messageListenerAdapter.setDefaultListenerMethod("myHandleMessage");
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
测试类代码:
@Test
void testSendOrderToXiaoMi() throws JsonProcessingException {
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
//变成json字符串
properties.setContentType("application/json");
MyOrder myOrder = new MyOrder();
myOrder.setId(1);
myOrder.setName("一部最新的小米手机");
myOrder.setPrice(1999);
//把普通对象变成JSON格式的字符串
ObjectMapper objectMapper = new ObjectMapper();
String jsonStr = objectMapper.writeValueAsString(myOrder);
System.out.println( "====================" + jsonStr);
Message message = new Message(jsonStr.getBytes(), properties);
rabbitTemplate.send("","队列spring-监听器-小米",message);
}
17、Jackson2JsonMessageConverter接收并处理Map类型的消息
配置类代码:
//使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//Jackson2JsonMessageConverter接收并处理Map类型的消息
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
//设置消息的转换器
messageListenerAdapter.setMessageConverter(jsonMessageConverter);
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//使用自己的类型转换器 默认使用反射,寻找方法名
messageListenerAdapter.setDefaultListenerMethod("myHandleMap");
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
处理方法的参数为转换过的Map类型
// 处理方法的参数为转换过的Map类型
public void myHandleMap(Map map){
System.out.println( "处理方法的参数为转换过的Map类型" + map);
}
测试类代码:
@Test
void testSendOrderToXiaoMi() throws JsonProcessingException {
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
//变成json字符串
properties.setContentType("application/json");
MyOrder myOrder = new MyOrder();
myOrder.setId(1);
myOrder.setName("一部最新的小米手机");
myOrder.setPrice(1999);
//把普通对象变成JSON格式的字符串
ObjectMapper objectMapper = new ObjectMapper();
String jsonStr = objectMapper.writeValueAsString(myOrder);
//System.out.println( "====================" + jsonStr);
Message message = new Message(jsonStr.getBytes(), properties);
rabbitTemplate.send("","队列spring-监听器-小米",message);
}
18、使用JavaTypeMapper直接接收并处理Java对象
配置类代码:
//使用监听适配器,直接处理传过来的Java对象
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//Jackson2JsonMessageConverter接收并处理Map类型的消息
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
//信任所有的包 进行反射
defaultJackson2JavaTypeMapper.setTrustedPackages("*");
//进行映射
jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
//设置消息的转换器
messageListenerAdapter.setMessageConverter(jsonMessageConverter);
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//使用自己的类型转换器 默认使用反射,寻找方法名
messageListenerAdapter.setDefaultListenerMethod("myHandleJavaObject");
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
处理方法的参数为转换过的Java类型代码:
public void myHandleJavaObject(MyOrder myOrder){
System.out.println( "处理方法的参数为转换过的Java类型" + myOrder);
}
测试类代码:
@Test
void testSendJavaClassObjectToXiaoMi() throws JsonProcessingException {
MessageProperties properties = new MessageProperties();
properties.setContentEncoding("UTF-8");
//变成json字符串
properties.setContentType("application/json");
//把Java的类对象转换为Json在转换为字节数组
properties.getHeaders().put("__TypeId__", "com.study.rabbitmq.p10TestSpringListenerAdaper.MyOrder");
MyOrder myOrder = new MyOrder();
myOrder.setId(1);
myOrder.setName("一部最新的小米手机-Java对象");
myOrder.setPrice(1999);
//把普通对象变成JSON格式的字符串
ObjectMapper objectMapper = new ObjectMapper();
String jsonStr = objectMapper.writeValueAsString(myOrder);
//System.out.println( "====================" + jsonStr);
Message message = new Message(jsonStr.getBytes(), properties);
rabbitTemplate.send("","队列spring-监听器-小米",message);
}
19、全局转换器-处理多种类型的消息-1
根据内容类型自定义转换器,需要使用全局消息转换器
ContentTypeDelegatingMessageConverter
配置类代码:
//使用全局转换器,对不同类型的数据进行定向转换
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
//把工厂传进来
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置多个监听队列
simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());
//1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
simpleMessageListenerContainer.setConcurrentConsumers(1);
//创建多少个监听器,来监听消息队列
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置收到消息后的确认模式 AcknowledgeMode.AUTO 表示对消息进行自动确认
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置标签,然后台更加的认识它们
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Override
public String createConsumerTag(String consumerTag) {
//自己手写改动过后的
consumerTag = consumerTag + "-" + num++;
return consumerTag;
//默认返回的就是,rabbitMq自动给咱们起好的名字
//return consumerTag;
}
});
//使用MessageListenerAdapter类进行消息接收并处理
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
//Jackson2JsonMessageConverter接收并处理Map类型的消息
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
//创建全局转换器,对不同类型的数据进行定向转换
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
//根据不同的内容创建不同的转换器
converter.addDelegate("my-jpg", new MyMessageJPGConverter());
converter.addDelegate("my-pdf", new MyMessagePDFConverter());
//设置消息的转换器
messageListenerAdapter.setMessageConverter(converter);
//new MyMessageDelegate() 这里使用自己的方法
messageListenerAdapter.setDelegate(new MyMessageDelegate());
//使用自己的类型转换器 默认使用反射,寻找方法名
messageListenerAdapter.setDefaultListenerMethod("myHandleFile");
//进行绑定
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
处理方法的参数为转换过的File类型的代码
// 处理方法的参数为转换过的File类型
public void myHandleFile(File file){
System.out.println( "处理方法的参数为转换过的File类型" + file.getName());
}
读取文件的二进制流发送到中间件的代码:
//读取文件的二进制流发送到中间件
@Test
void testSendFileByteToXiaoMi() throws Exception {
//处理图片
MessageProperties properties = new MessageProperties();
//设置自己的类型
properties.setContentType("my-jpg");
properties.getHeaders().put("extName","jpg");
//读取文件的二进制
File file = new File("E:\\其他文件\\SiKi\\资料RabbitMQ\\dog.jpg");
byte[] bytes = Files.readAllBytes(file.toPath());
Message message = new Message(bytes, properties);
rabbitTemplate.send("","队列spring-监听器-小米",message);
//处理PDF文件
MessageProperties propertiesPDF = new MessageProperties();
//设置自己的类型
propertiesPDF.setContentType("my-pdf");
propertiesPDF.getHeaders().put("extName","pdf");
//读取文件的二进制
File filePDF = new File("E:\\其他文件\\SiKi\\资料RabbitMQ\\snake.pdf");
byte[] bytesPDF = Files.readAllBytes(filePDF.toPath());
Message messagePDF = new Message(bytesPDF, propertiesPDF);
rabbitTemplate.send("","队列spring-监听器-小米",messagePDF);
}
根据不同的内容创建不同的转换器的代码
package com.study.rabbitmq.p10TestSpringListenerAdaper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
/**
* @author 武帅
* @date 2020/6/7 16:18
* @description 自定义图片转换器
* MessageConverter自定义接收数据类型
*
* 类型转换器(实现接口MessageConverter)
*/
public class MyMessagePDFConverter implements MessageConverter {
// java对象转换为Message对象
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
Message message = new Message(o.toString().getBytes(),messageProperties);
return message;
}
@Override
public Object fromMessage(Message message) {
//获取扩展名
String extName = message.getMessageProperties().getHeaders().get("extName").toString();
//做文件路径的拼接
String path = "E:/" + UUID.randomUUID().toString() + "." + extName;
//进行文件的读写
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return file;
}
}
根据不同的内容创建不同的转换器的代码
package com.study.rabbitmq.p10TestSpringListenerAdaper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
/**
* @author 武帅
* @date 2020/6/7 16:18
* @description 自定义PDF转换器
* MessageConverter自定义接收数据类型
*
* 类型转换器(实现接口MessageConverter)
*/
public class MyMessagePDFConverter implements MessageConverter {
// java对象转换为Message对象
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
Message message = new Message(o.toString().getBytes(),messageProperties);
return message;
}
@Override
public Object fromMessage(Message message) {
//获取扩展名
String extName = message.getMessageProperties().getHeaders().get("extName").toString();
//做文件路径的拼接
String path = "E:/" + UUID.randomUUID().toString() + "." + extName;
//进行文件的读写
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return file;
}
}
20、使用SpringBoot的配置文件和注解进行收发消息 (这里类似于SpringBoot整合RabbitMq)
配置Application.properties
配置后,就可以直接使用RabbitTemplate的实力化bean了,连自己实例化bean都省了
第一步:
创建两个工程(这个就不多说了,很简单啊)
第二步:
使用配置文件连接消息中间件
#SpringBoot连接rabbitmq的信息
#SpringBoot连接rabbitmq的地址 如果是本机则写localhost 如果是别的机器,则写它的IP地址
spring.rabbitmq.addresses=localhost
#SpringBoot连接rabbitmq的用户名 默认是guest
spring.rabbitmq.username=guest
#SpringBoot连接rabbitmq的密码 默认是guest
spring.rabbitmq.password=guest
#SpringBoot连接rabbitmq的虚拟机 /表示根目录
spring.rabbitmq.virtual-host=/
#SpringBoot连接rabbitmq的超时时间 以毫秒为单位 这里先设为10秒(如果10秒内还没有连接到rabbitmq的话,那就退出连接)
spring.rabbitmq.connection-timeout=10000
21、注入模版对象发送消息-携带相关数据ID (以下所有的标题和代码内容都和上一个内容有关)
这里的上一个内容为20标题
创建配置类的代码:
package com.study.rabbitmq;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author 武帅
* @date 2020/6/9 15:56
* @description 生产者的配置类
* @Configuration 添加了此注解 则表示这个类为配置类(用于实例化需要的Bean)
* @ComponentScan({"com.study.rabbitmq.*"}) 添加了此注解 则表示要扫描这个包下所有的组件类,并且实例化
*/
@Configuration
@ComponentScan({"com.study.rabbitmq.*"})
public class MyConfigProducer {
}
创建生产者类的代码:
package com.study.rabbitmq;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @author 武帅
* @date 2020/6/9 16:01
* @description @Component 添加此注解 则表示自动实例化,变成组件类
*/
@Component
public class MySendComponent {
//Spring整合的RabbitMq Spring对它进行了优化,而创造出来的类RabbitTemplate 是个模板
@Autowired
RabbitTemplate rabbitTemplate;
/**
* @Author: 武帅
* @Date: 2020-06-09
* @Description:
* @param object 表示 需要发送的实际内容
* @param map 表示 需要携带的特殊信息
*@return: void
*/
public void send(Object object, Map<String,Object> map){
//这里导入的包是org.springframework.messaging.MessageHeaders
MessageHeaders messageHeaders = new MessageHeaders(map);
//这里导入的包是org.springframework.messaging.Message
Message message = MessageBuilder.createMessage(object,messageHeaders);
//创建唯一的UUID
String uuId = UUID.randomUUID().toString();
//需要传入一个字符串 保证全局唯一
//作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
CorrelationData correlationData = new CorrelationData(uuId);
//发送消息
//第一个参数为交换机的名字
//第一个参数为交换机的路由键
//第三个参数为要发送的数据
//第四个参数为要携带相关数据的ID 用于:如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
rabbitTemplate.convertAndSend("交换机spring短信","发送短信",message,correlationData);
//应该还有一步 就是发送完短信,因该把UUID插入到数据库中。
}
}
22、确认模式
发送消息代码:
package com.study.rabbitmq;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private MySendComponent mySendComponent;
@Test
void test() {
System.out.println("hello");
}
@Test
void sendStringToMQ(){
//需要发送的数据
String str = "hello world";
//需要携带的特殊信息
Map<String,Object> map = new HashMap<>();
map.put("收货人姓名","Jack");
map.put("收货人地址","北京");
map.put("收货人电话","110");
mySendComponent.send(str,map);
}
}
往application.properties文件中添加了其它的配置文件代码:
#SpringBoot整合rabbitmq的确认模式 默认是none
spring.rabbitmq.publisher-confirm-type=correlated
#SpringBoot整合rabbitmq的返回模式 默认是false
spring.rabbitmq.publisher-returns=true
#默认是false 则表示不通知,直接扔掉, 如果是true的话,则表示投递出错,通知生产者
spring.rabbitmq.template.mandatory=true
往MySendComponent文件中添加了其它的代码:
package com.study.rabbitmq;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @author 武帅
* @date 2020/6/9 16:01
* @description @Component 添加此注解 则表示自动实例化,变成组件类
*/
@Component
public class MySendComponent {
//Spring整合的RabbitMq Spring对它进行了优化,而创造出来的类RabbitTemplate 是个模板
@Autowired
RabbitTemplate rabbitTemplate;
//回调函数
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack == false){
System.out.println("出错了,发送了Mq出现了问题,此时因该更新数据库,失败的原因:" + cause
+ "错误的ID为:" + correlationData);
}else{
System.out.println("发送成功,原因为:" + cause + "Id为:" + correlationData);
}
}
};
/**
* @Author: 武帅
* @Date: 2020-06-09
* @Description:
* @param object 表示 需要发送的实际内容
* @param map 表示 需要携带的特殊信息
*@return: void
*/
public void send(Object object, Map<String,Object> map){
//这里导入的包是org.springframework.messaging.MessageHeaders
MessageHeaders messageHeaders = new MessageHeaders(map);
//这里导入的包是org.springframework.messaging.Message
Message message = MessageBuilder.createMessage(object,messageHeaders);
//创建唯一的UUID
String uuId = UUID.randomUUID().toString();
//需要传入一个字符串 保证全局唯一
//作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
CorrelationData correlationData = new CorrelationData(uuId);
//回调函数 如果出错,则调用此函数
rabbitTemplate.setConfirmCallback(confirmCallback);
//发送消息
//第一个参数为交换机的名字
//第一个参数为交换机的路由键
//第三个参数为要发送的数据 这里暂时没有数据,数据由外边的类来提供
//第四个参数为要携带相关数据的ID 用于:如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
rabbitTemplate.convertAndSend("交换机spring短信","发送短信",message,correlationData);
//应该还有一步 就是发送完短信,因该把UUID插入到数据库中。
}
}
23、返回模式
发送消息代码: 和上面确认模式的一样
application.properties文件: 和上面确认模式的一样
往MySendComponent文件中添加了其它的代码:
package com.study.rabbitmq;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @author 武帅
* @date 2020/6/9 16:01
* @description @Component 添加此注解 则表示自动实例化,变成组件类
*/
@Component
public class MySendComponent {
//Spring整合的RabbitMq Spring对它进行了优化,而创造出来的类RabbitTemplate 是个模板
@Autowired
RabbitTemplate rabbitTemplate;
//回调函数
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack == true){
System.out.println("出错了,发送了Mq出现了问题,此时因该更新数据库,失败的原因:" + cause
+ "错误的ID为:" + correlationData);
}else{
System.out.println("发送成功,原因为:" + cause + "Id为:" + correlationData);
}
}
};
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("replyCode :" + replyCode );
System.out.println("replyText :" + replyText );
System.out.println("exchange :" + exchange );
System.out.println("routingKey :" + routingKey );
System.out.println("message :" + message );
}
};
/**
* @Author: 武帅
* @Date: 2020-06-09
* @Description:
* @param object 表示 需要发送的实际内容
* @param map 表示 需要携带的特殊信息
*@return: void
*/
public void send(Object object, Map<String,Object> map){
//这里导入的包是org.springframework.messaging.MessageHeaders
MessageHeaders messageHeaders = new MessageHeaders(map);
//这里导入的包是org.springframework.messaging.Message
Message message = MessageBuilder.createMessage(object,messageHeaders);
//创建唯一的UUID
String uuId = UUID.randomUUID().toString();
//需要传入一个字符串 保证全局唯一
//作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
CorrelationData correlationData = new CorrelationData(uuId);
//回调函数 如果出错,则调用此函数
rabbitTemplate.setConfirmCallback(confirmCallback);
//返回模式
rabbitTemplate.setReturnCallback(returnCallback);
//发送消息
//第一个参数为交换机的名字
//第一个参数为交换机的路由键
//第三个参数为要发送的数据 这里暂时没有数据,数据由外边的类来提供
//第四个参数为要携带相关数据的ID 用于:如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
rabbitTemplate.convertAndSend("交换机spring短信","发送短信",message,correlationData);
//应该还有一步 就是发送完短信,因该把UUID插入到数据库中。
}
}
24、使用RabbitListener注解消费数据并手动确认
@RabbitListener是一个组合注解,可以组合其它注解一起来完成对队列的消息接收和监听,主要包括@QueueBinding、@Queue、@Exchange
此时是在rabbitmqConsumer工程里写代码
application.properties文件的代码
#SpringBoot连接rabbitmq的信息
#SpringBoot连接rabbitmq的地址 如果是本机则写localhost 如果是别的机器,则写它的IP地址
spring.rabbitmq.addresses=localhost
#SpringBoot连接rabbitmq的用户名 默认是guest
spring.rabbitmq.username=guest
#SpringBoot连接rabbitmq的密码 默认是guest
spring.rabbitmq.password=guest
#SpringBoot连接rabbitmq的虚拟机 /表示根目录
spring.rabbitmq.virtual-host=/
#SpringBoot连接rabbitmq的超时时间 以毫秒为单位 这里先设为10秒(如果10秒内还没有连接到rabbitmq的话,那就退出连接)
spring.rabbitmq.connection-timeout=10000
#rabbitmq的手动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq的监听对象的数量
spring.rabbitmq.listener.simple.concurrency=1
#SpringBoot为我们创建5个对象进行同时监听
spring.rabbitmq.listener.simple.max-concurrency=5
配置类的代码:
package com.study.rabbitmq;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author 武帅
* @date 2020/6/11 11:03
* @description @ComponentScan({"com.study.rabbitmq.*"}) 表示扫描包下所有的类
*/
@Configuration
@ComponentScan({"com.study.rabbitmq.*"})
public class MyConfigConsumer {
}
组件类的代码
package com.study.rabbitmq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 武帅
* @date 2020/6/11 11:05
* @description @Component 表示这是一个组件类.会被自动的实例化,进行一个注入
*/
@Component
public class MyRecvComponent {
//需要引入 import org.springframework.messaging.Message;
//需要引入 import com.rabbitmq.client.Channel;
@RabbitListener(
//queues = {"队列spring短信"} 监听的队列
queues = {"队列spring短信"}
)
@RabbitHandler
public void onMessage(Message message, Channel channel){
System.out.println("收到了: " + message.getPayload());
//获取消息唯一Id
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//因为现在是手动确认模式,所以要设置basicAck
//第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
//第二个参数 不自动批量
try {
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
25、使用RabbitListener复合注解 创建队列交换机和绑定
application.properties文件的代码 和上面的一样
配置类的代码: 和上面的一样
组件类的代码:
package com.study.rabbitmq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 武帅
* @date 2020/6/11 11:05
* @description @Component 表示这是一个组件类.会被自动的实例化,进行一个注入
*/
@Component
public class MyRecvComponent {
//需要引入 import org.springframework.messaging.Message;
//需要引入 import com.rabbitmq.client.Channel;
@RabbitListener(
//queues = {"队列spring短信"} 监听的队列
queues = {"队列spring短信"}
)
@RabbitHandler
public void onMessage(Message message, Channel channel){
System.out.println("收到了: " + message.getPayload());
//获取消息唯一Id
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//因为现在是手动确认模式,所以要设置basicAck
//第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
//第二个参数 不自动批量
try {
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
// =========================================================================================
//需要引入 import org.springframework.messaging.Message;
//需要引入 import com.rabbitmq.client.Channel;
//使用复合注解创建队列
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "队列-注解创建的",declare = "true",exclusive = "false",autoDelete = "false"),
exchange = @Exchange(value = "交换机-注解创建的",durable = "true", type = "direct"),
key = "新路由键"
)
)
@RabbitHandler
public void onMessageNewQueue(Message message, Channel channel){
System.out.println("NewQue收到了: " + message.getPayload());
//获取消息唯一Id
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//因为现在是手动确认模式,所以要设置basicAck
//第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
//第二个参数 不自动批量
try {
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
26、使用注解直接发送和接收java自定义对象
要求:实体类必须实现接口 Serializable
实体类最好定义 序列号ID
1、首先在生产端定义一个实体类
package com.study.rabbitmq;
import java.io.Serializable;
/**
* @author 武帅
* @date 2020/6/12 15:35
* @description
*/
public class MyUser implements Serializable {
//随便给一个UID即可
private static final long serialVersionUID = 870292457463066666L;
private String userName;
private Integer age;
private long exd;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public long getExd() {
return exd;
}
public void setExd(long exd) {
this.exd = exd;
}
public MyUser() { }
public MyUser(String userName, Integer age, long exd) {
this.userName = userName;
this.age = age;
this.exd = exd;
}
}
2、然后再组件类里添加其余的代码:
public void sendMyUser(MyUser user){
//回调函数 如果出错,则调用此函数
rabbitTemplate.setConfirmCallback(confirmCallback);
//返回模式
rabbitTemplate.setReturnCallback(returnCallback);
//创建唯一的UUID
String uuId = UUID.randomUUID().toString();
//需要传入一个字符串 保证全局唯一
//作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
CorrelationData correlationData = new CorrelationData(uuId);
rabbitTemplate.convertAndSend("交换机-注解创建的","新路由键",user,correlationData);
}
3、测试类里添加其余的代码:
@Test
void SendUserToMQ(){
MyUser user = new MyUser("张三",18,2000L);
mySendComponent.sendMyUser(user);
}
4、在切换为消费端
5、在消费端直接粘贴生产端的实体类
6、在组件类里添加其余的代码:
//需要引入 import org.springframework.messaging.Message;
//需要引入 import com.rabbitmq.client.Channel;
@RabbitListener(
//queues = {"队列spring短信"} 监听的队列
queues = {"队列-注解创建的"}
)
@RabbitHandler
public void onMessageUser(@Payload MyUser user, Channel channel, @Headers Map<String,Object> headers){
System.out.println("收到了: " + user.getUserName());
System.out.println("收到了: " + user.getAge());
System.out.println("收到了: " + user.getExd());
//获取消息唯一Id
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//因为现在是手动确认模式,所以要设置basicAck
//第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
//第二个参数 不自动批量
try {
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
好了到此RabbitMQ的基础部分就结束了
