RabbitMQ(基本使用2)

这是和RabbitMQ(基本使用)文章 一起使用

18、RabbitMQ和Spring的整合
1、 重要的类

RabbitAdmin可以很好的处理交换机和队列,修改,删除并且可以注解到spring里

RabbitTemplate类负责收发消息

2、创建SpringBoot工程
<!-- 添加rabbitmq的基本依赖 -->   
<dependency>
    <groupid>com.rabbitmq</groupid>
    <artifactid>amqp-client<artifactid>
</dependency>
    
 <!-- Spring对rabbitmq支持的依赖 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>
 
<!-- SpringBoot对rabbitmq支持的依赖 -->
<dependency>
    <groupid>com.springframework.boot<groupid>
    <artifactid>spring-boot-starter-amqp<artifactid>
</dependency>
        
版本使用boot默认
3、创建一个配置类
package com.study.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 武帅
 * @date 2020/5/31 10:04
 * @description
 */
//等价于配置文件(***.xml)
@Configuration
public class ConfigurationRabbitmq {

    //连接工厂
    //ConnectionFactory 注意不再是rabbit里的了,而是spring里面的
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("localhost:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }



    //管理交换机和队列的RabbitAdmin
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }


}
4、RabbitAdmin声明队列交换机并进行绑定
package com.study.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private RabbitAdmin rabbitAdmin;


    @Test
    void test测试产生队列交换机进行绑定(){

        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchange = new DirectExchange("交换机spring短信",true,false,null);

        //创建交换机
        rabbitAdmin.declareExchange(exchange);


        //需要导入这个类 import org.springframework.amqp.core.Queue;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
        //第四个参数 是否自动删除
        Queue queue = new Queue("队列spring短信",true,false,false);

        //创建队列
        rabbitAdmin.declareQueue(queue);


        //需要引入这个包 import org.springframework.amqp.core.Binding;
        //第一个参数 需要绑定的队列名字
        //第二个参数 绑定的类型 (是一个枚举)
        //第三个参数 需要绑定的交换机名字
        //第四个参数 路由键 (这里是直连型交换机。路由键必须保持一致)
        //第五个参数 携带的头信息 这里暂时没有
        Binding binding = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信","发送短信",null);

        //进行绑定
        rabbitAdmin.declareBinding(binding);

    }

}
5、RabbitAdmin创建四种类型的交换机并绑定到一个队列

​ 代码如下:

package com.study.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private RabbitAdmin rabbitAdmin;


    @Test
    void test测试产生队列交换机进行绑定(){

        //创建直连型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeDirect = new DirectExchange("交换机spring短信",true,false,null);

        //把上面的值进行入参
        rabbitAdmin.declareExchange(exchangeDirect);


        //创建fanout型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",true,false,null);

        //把上面的值进行入参
        rabbitAdmin.declareExchange(exchangeFanout);


        //创建topic型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeTopicStar = new TopicExchange("交换机spring短信-topic",true,false,null);

        //把上面的值进行入参
        rabbitAdmin.declareExchange(exchangeTopicStar);


        //创建header型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",true,false,null);

        //把上面的值进行入参
        rabbitAdmin.declareExchange(exchangeHeader);



        //创建队列 这里就创建了一个队列和4个交换机进行绑定
        //需要导入这个类 import org.springframework.amqp.core.Queue;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
        //第四个参数 是否自动删除
        Queue queue = new Queue("队列spring短信",true,false,false);

        //把上面的值传递过来
        rabbitAdmin.declareQueue(queue);


        //使用直连型交换机和队列进行绑定
        //需要引入这个包 import org.springframework.amqp.core.Binding;
        //第一个参数 需要绑定的队列名字
        //第二个参数 绑定的类型 (是一个枚举)
        //第三个参数 需要绑定的交换机名字
        //第四个参数 路由键 (这里是直连型交换机。路由键必须保持一致)
        //第五个参数 携带的头信息 这里暂时没有
        Binding binding = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信","发送短信",null);

        //把上面的参数传递过来
        rabbitAdmin.declareBinding(binding);


        //使用fanout型交换机和队列进行绑定
        //需要引入这个包 import org.springframework.amqp.core.Binding;
        //第一个参数 需要绑定的队列名字
        //第二个参数 绑定的类型 (是一个枚举)
        //第三个参数 需要绑定的交换机名字
        //第四个参数 路由键 (这里是fanout型交换机。没有路由键也可以)
        //第五个参数 携带的头信息 这里暂时没有
        Binding bindingFanout = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-fanout","",null);

        //把上面的参数传递过来
        rabbitAdmin.declareBinding(bindingFanout);

        //使用topic型交换机和队列进行绑定
        //需要引入这个包 import org.springframework.amqp.core.Binding;
        //第一个参数 需要绑定的队列名字
        //第二个参数 绑定的类型 (是一个枚举)
        //第三个参数 需要绑定的交换机名字
        //第四个参数 路由键 (这里是topic型交换机)
        //                  .*代表没有,或者一个(不能多个)
        //第五个参数 携带的头信息 这里暂时没有
        Binding bindingTopic = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-topic","0086.*",null);

        //把上面的参数传递过来
        rabbitAdmin.declareBinding(bindingTopic);


        //header型交换机的参数 这里的all表示除去第一个参数,其余的都得有和正确才行, any表示有一个就可以
        Map<String,Object> map = new HashMap<>();
        map.put("x-match","all");
        map.put("userName","华为");
        map.put("passWord","123456");

        //使用header型交换机和队列进行绑定
        //需要引入这个包 import org.springframework.amqp.core.Binding;
        //第一个参数 需要绑定的队列名字
        //第二个参数 绑定的类型 (是一个枚举)
        //第三个参数 需要绑定的交换机名字
        //第四个参数 路由键 (这里是header型交换机。没有路由键也可以)
        //第五个参数 携带的头信息 这里暂时没有
        Binding bindingHeader = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-header","",map);

        //把上面的参数传递过来
        rabbitAdmin.declareBinding(bindingHeader);





    }

}

6、RabbitAdmin清空和删除队列API

​ 代码如下:

    //测试删除队列和清除队列消息   Purge : 清除
    @Test
    void testDeleteQueuesAndPurgeQueueMessage(){

        //清除队列消息 只有这个队列存在和队列名字正确,才可以清除,系统不报错
        rabbitAdmin.purgeQueue("队列topic*");


        //删除队列
        rabbitAdmin.deleteQueue("队列spring短信");

        //删除交换机
        rabbitAdmin.deleteExchange("交换机spring短信");
        rabbitAdmin.deleteExchange("交换机spring短信-fanout");
        rabbitAdmin.deleteExchange("交换机spring短信-topic");
        rabbitAdmin.deleteExchange("交换机spring短信-header");

    }
7、使用Bean注解创建交换机和队列绑定

也就是使用Spring创建交换机和队列

代码如下:

@Configuration
public class ConfigurationRabbitmq //这个类里多出来的代码
    
    //使用Bean注解创建队列  也就是使用Spring创建队列
    @Bean
    public Queue queueSMS(){
        //创建队列
        //需要导入这个类 import org.springframework.amqp.core.Queue;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
        //第四个参数 是否自动删除
        Queue queue = new Queue("队列spring短信",true,false,false);
        return queue;
    }


    //使用Bean注解创建直连型交换机  也就是使用Spring创建直连型交换机
    //如果嫌这种入参方式麻烦,也可以使用链式编程,连续...的方式 
                            //queueSMS()表示队列的名字
                            //exchangeDirect()表示交换机的名字
                            //"发送短信" 表示 路由键
                            //and(null); null 表示附加的值
                            //Binding binding =       BindingBuilder.bind(queueSMS()).to(exchangeDirect()).with("发送短信").and(null);
    @Bean
    public Exchange exchangeDirect(){
        //创建直连型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeDirect = new DirectExchange("交换机spring短信",true,false,null);
        return exchangeDirect;
    }


    //使用Bean注解创建fanout交换机  也就是使用Spring创建fanout交换机
    @Bean
    public Exchange exchangeFanout(){
        //创建fanout型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",true,false,null);
        return exchangeFanout;
    }


    //使用Bean注解创建topic交换机  也就是使用Spring创建topic交换机
    @Bean
    public Exchange exchangeTopicStar(){
        //创建topic型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeTopicStar = new TopicExchange("交换机spring短信-topic",true,false,null);
        return exchangeTopicStar;
    }


    //使用Bean注解创建header交换机  也就是使用Spring创建header交换机
    @Bean
    public Exchange exchangeHeader(){
        //创建header型交换机
        //需要导入这个类 import org.springframework.amqp.core.Exchange;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否自动删除
        //第四个参数 携带的头信息 这里暂时没有
        Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",true,false,null);
        return exchangeHeader;
    }

    //使用Bean注解绑定直连型交换机
    @Bean
    public Binding bindingDirect(){
        Binding binding = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信","发送短信",null);
        return binding;
    }

    //使用Bean注解绑定fanout型交换机
    @Bean
    public Binding bindingFanout(){
        Binding bindingFanout = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-fanout","",null);
        return bindingFanout;
    }

    //使用Bean注解绑定topic型交换机
    @Bean
    public Binding bindingTopic(){
        Binding bindingTopic = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-topic","0086.*",null);
        return bindingTopic;
    }

    //使用Bean注解绑定header型交换机
    @Bean
    public Binding bindingHeader(){
        //header型交换机的参数 这里的all表示除去第一个参数,其余的都得有和正确才行, any表示有一个就可以
        Map<String,Object> map = new HashMap<>();
        map.put("x-match","all");
        map.put("userName","华为");
        map.put("passWord","123456");

        Binding bindingHeader = new Binding("队列spring短信",Binding.DestinationType.QUEUE,"交换机spring短信-header","",map);
        return bindingHeader;
    }
   

测试类里的代码:

    @Autowired
    private Binding bindingDirect;

    @Autowired
    private Binding bindingFanout;

    @Autowired
    private Binding bindingTopic;

    @Autowired
    private Binding bindingHeader;

    //使用Bean注解创建交换机和队列绑定  也就是使用Spring创建交换机和队列
    @Test
    void testDefineQueueExchangeBindingBySpring(){
        rabbitAdmin.declareBinding(bindingDirect);
        rabbitAdmin.declareBinding(bindingFanout);
        rabbitAdmin.declareBinding(bindingTopic);
        rabbitAdmin.declareBinding(bindingHeader);
    }
8、RabbitTemplate使用convertAndSend方法发送消息

配置类的代码:

    //创建一个可以收发消息的模板 (这个平时用的比较多)
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

测试类代码:

@SpringBootTest
class TestRabbitmqTemplate {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    //使用RabbitMq模板给交换机发送消息 使用convertAndSend()方法
    @Test
    void TestSimpleSend(){

        //第一个参数 交换机的名字
        //第二个参数 路由键 (因为是直连型交换机,所以路由键必须一致)
        //第三个参数 发送的具体信息 (这里不用使用 .getBytes()方法了)
        //也可以使用第4个参数 携带的头信息
        String sendSMS = "直连型交换机发送短信";
        rabbitTemplate.convertAndSend("交换机spring短信", "发送短信", sendSMS, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().getHeaders().put("登录次数",2);
                return message;
            }
        });


    }


}
9、RabbitTemplate使用Send方法发送消息

​ 代码如下:

//使用RabbitMq模板给交换机发送消息 使用send()方法
    @Test
    void testSend(){

        String sendSMS = "使用send()方法来发送短信";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        properties.getHeaders().put("登录次数",3);

        Message message = new Message(sendSMS.getBytes(),properties);

        rabbitTemplate.send("交换机spring短信-fanout","",message);
    }


    //使用RabbitMq模板给交换机发送消息 使用send()方法
    @Test
    void testSendTopicAndHeaders(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        properties.getHeaders().put("登录次数",4);

        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");
        propertiesHeader.getHeaders().put("userName","华为");
        propertiesHeader.getHeaders().put("passWord","123456");
        propertiesHeader.getHeaders().put("登录次数",5);


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
    }
10、RabbitTemplate使用Receive方法接收消息
package com.study.rabbitmq.p02TestRabbitmqTemplate;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Map;

/**
 * @author 武帅
 * @date 2020/6/5 09:26
 * @description
 */
@SpringBootTest
class TestRabbitmqTemplateReceive {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    //使用RabbitMq模板接收消息
    @Test
    void testReceive(){

        //一般不怎么使用while循环true 这里只是为了测试用下
        while (true){
            //第一个参数 交换机的名字
            //第二个参数 毫秒数 如果2秒中内,没有消息就退出这个消息
            Message message = rabbitTemplate.receive("队列spring短信", 2000);

            if(message == null){
                break;
            }
            String str = new String(message.getBody());
            System.out.println("收到了" + str);

            //获取头信息
            Map<String, Object> headers = message.getMessageProperties().getHeaders();

            //遍历头信息
            for (Map.Entry<String, Object> entry : headers.entrySet()) {
                System.out.println(entry);
            }

        }
    }



}
11、SimpleMessageListenerContainer基本属性和标识策略, 使用MessageListener接收并处理消息

要在SpringBoot下启动,如果在测试单元下,则会出现问题

package com.study.rabbitmq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * @author 武帅
 * @date 2020/6/5 11:14
 * @description 消费者的配置文件
 */
@Configuration
public class ConfigurationConsumer {


    //使用Bean注解创建队列  也就是使用Spring创建队列
    @Bean
    public Queue queueXiaoMi(){
        //创建队列
        //需要导入这个类 import org.springframework.amqp.core.Queue;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
        //第四个参数 是否自动删除
        Queue queue = new Queue("队列spring-监听器-小米",true,false,false);
        return queue;
    }

    //使用Bean注解创建队列  也就是使用Spring创建队列
    @Bean
    public Queue queueHongMi(){
        //创建队列
        //需要导入这个类 import org.springframework.amqp.core.Queue;
        //第一个参数 名字(最好是英文)见名知意
        //第二个参数 是否要持久化到本地磁盘中
        //第三个参数 是否独占 (一般不使用独占,除非是特殊业务)
        //第四个参数 是否自动删除
        Queue queue = new Queue("队列spring-监听器-红米",true,false,false);
        return queue;
    }


    private Integer num = 0;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO表示为自动模式
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //设置消息监听器
        simpleMessageListenerContainer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //获取出消息,在打印
                String oneData = new String(message.getBody());
                System.out.println( "收到了消息:" + oneData);

                //获取信息的自定义内容
                Map<String, Object> headers = message.getMessageProperties().getHeaders();

                //遍历获取信息的自定义内容
                for (Map.Entry<String, Object> entry : headers.entrySet()) {
                    System.out.println(entry);
                }

            }
        });


        return simpleMessageListenerContainer;
    }

}

使用生产者生产数据,测试数据

//使用RabbitMq模板给交换机发送消息 使用send()方法
    @Test
    void testSendTopicAndHeaders(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        properties.getHeaders().put("登录次数",4);

        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");
        propertiesHeader.getHeaders().put("userName","华为");
        propertiesHeader.getHeaders().put("passWord","123456");
        propertiesHeader.getHeaders().put("登录次数",5);


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
    }
12、使用ChannelAwareMessageListener接收消息-手动确认消息
//测试手动接受消息确认
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.MANUAL表示手动接收消息
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //设置消息监听器
        simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                //获取出消息,在打印
                String oneData = new String(message.getBody());
                System.out.println( "收到了消息:" + oneData);

                //获取信息的自定义内容
                Map<String, Object> headers = message.getMessageProperties().getHeaders();

                //遍历获取信息的自定义内容
                for (Map.Entry<String, Object> entry : headers.entrySet()) {
                    System.out.println(entry);
                }

                //对消息进行了手动的确认  第二个参数表示:是否批量(这里我们是一条一条处理,所以不用批量)
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

                //也可以使用这种方法 nack增加了重回队列的参数,最后一个false代表不用重回
                //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);



            }

        });

        return simpleMessageListenerContainer;
    }

使用生产者生产数据,测试数据

//使用RabbitMq模板给交换机发送消息 使用send()方法
    @Test
    void testSendTopicAndHeaders(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        properties.getHeaders().put("登录次数",4);

        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");
        propertiesHeader.getHeaders().put("userName","华为");
        propertiesHeader.getHeaders().put("passWord","123456");
        propertiesHeader.getHeaders().put("登录次数",5);


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
    }
13、 使用MessageListenerAdapter类进行消息接收并处理 消息监听适配器(在实际中运用的比较多)

使用MessageListenerAdapter处理器进行消息队列监听处理,如果容器没有设置setDefaultListenerMethod,则处理器中默认的处理方法名是handleMessage,如果设置了setDefaultListenerMethod,则处理器中处理消息的方法名就是setDefaultListenerMethod方法参数设置的值。也可以通过setQueueOrTagToMethodName方法为不同的队列设置不同的消息处理方法

自定义消息处理类,注意方法名和参数类型必须要一致

首先自己先定义一个MessageDelegate

package com.study.rabbitmq.p10TestSpringListenerAdaper;

/**
 * @author 武帅
 * @date 2020/6/7 10:55
 * @description
 */
public class MyMessageDelegate {

    public void handleMessage(byte[] body){
        String strMessage = new String(body);
        System.out.println("我收到了:" + strMessage);
    }


}

然后再自己写监听适配器对消息进行接受和消费

    //使用监听适配器对消息进行接受和消费
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

发送数据

   //使用RabbitMq模板给交换机发送消息 使用send()方法
    @Test
    void testSendTopicAndHeaders(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        properties.getHeaders().put("登录次数",4);

        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");
        propertiesHeader.getHeaders().put("userName","华为");
        propertiesHeader.getHeaders().put("passWord","123456");
        propertiesHeader.getHeaders().put("登录次数",5);


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
    }

==注意:这里和上面的使用方法为:必须是先使用测试单元,添加数据(添加之前先注释自己写好的@Bean),然后再解除注释,然后再次启动SpringBoot==

14、MessageListenerAdapter自定义方法接收消息-绑定队列和处理方法

自定义方法接收消息的代码:

    //自定义的处理消息的方法名称
    public void myHandleMessage(byte[] body){
        String strMessage = new String(body);
        System.out.println("自定义名称的方法 我收到了:" + strMessage);
    }

​ 配置类的代码

    //使用监听适配器对消息进行接受和消费
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //使用自己定义的MessageDelegate
        messageListenerAdapter.setDefaultListenerMethod("myHandleMessage");

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

生产者生产的代码

    //使用RabbitMq模板给交换机发送消息 使用send()方法
    @Test
    void testSendTopicAndHeaders(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "使用send()方法来发送短信---Topic型交换机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        properties.getHeaders().put("登录次数",4);

        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("交换机spring短信-topic","0086.18734910102",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "使用send()方法来发送短信---Header型交换机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");
        propertiesHeader.getHeaders().put("userName","华为");
        propertiesHeader.getHeaders().put("passWord","123456");
        propertiesHeader.getHeaders().put("登录次数",5);


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("交换机spring短信-header","",messageHeaders);
    }

绑定队列和处理方法de的代码:

 //专用于处理消息队列-小米手机的方法
    public void myHandleMessageXiaoMi(byte[] body){
        String strMessage = new String(body);
        System.out.println("自定义名称的方法-小米 我收到了:" + strMessage);
    }


    //专用于处理消息队列-红米手机的方法
    public void myHandleMessageHongMi(byte[] body){
        String strMessage = new String(body);
        System.out.println("自定义名称的方法-红米 我收到了:" + strMessage);
    }

配置类代码

//使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //这个Map记录了哪个队列使用哪个方法 key为队列的名字 value为方法的名字
        Map<String,String> queueMethod = new HashMap<>();
        queueMethod.put("队列spring-监听器-小米","myHandleMessageXiaoMi");
        queueMethod.put("队列spring-监听器-红米","myHandleMessageHongMi");

        //通过队列的名字来和方法的名字进行绑定  (此处使用Map集合)
        messageListenerAdapter.setQueueOrTagToMethodName(queueMethod);


        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

生产者的代码:

@Test
    void testSendToXiaoMiAndHongMi(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "一部小米手机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");


        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("","队列spring-监听器-小米",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "一部红米手机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("","队列spring-监听器-红米",messageHeaders);
    }
15、 MessageConverter自定义接收数据类型

MessageConverter自定义接收数据类型的代码:

package com.study.rabbitmq.p10TestSpringListenerAdaper;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 * @author 武帅
 * @date 2020/6/7 16:18
 * @description 自定义文本转换器
 *      MessageConverter自定义接收数据类型
 *
 *      类型转换器(实现接口MessageConverter)
 */
public class MyMessageTestConverter implements MessageConverter {


    // java对象转换为Message对象
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        Message message = new Message(o.toString().getBytes(),messageProperties);
        return message;
    }


    // Message对象转换为java对象
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {

        String contentType = message.getMessageProperties().getContentType();
        if(contentType != null && contentType.contains("text")){
            String str = new String(message.getBody());
            return str;
        }else{
            //对于不是文本的类型,直接返回字节数组
            return message.getBody();
        }
    }
}

配置类代码:

    //使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();

        //设置消息的转换器
        messageListenerAdapter.setMessageConverter(new MyMessageTestConverter());

        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //使用自己的类型转换器
        messageListenerAdapter.setDefaultListenerMethod("myHandleMessage");

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

处理方法的参数为转换过的字符串类型的代码:

    // 处理方法的参数为转换过的字符串类型
    public void myHandleMessage(String str){
        System.out.println( "处理方法的参数为转换过的字符串类型" + str);
    }

生产者的代码:

    @Test
    void testSendToXiaoMiAndHongMi(){

        //给Topic交换机上发送消息
        String sendSMSTopic = "一部小米手机";

        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");


        Message message = new Message(sendSMSTopic.getBytes(),properties);

        rabbitTemplate.send("","队列spring-监听器-小米",message);


        //给Header交换机上发送消息
        String sendSMSHeader = "一部红米手机";

        MessageProperties propertiesHeader = new MessageProperties();
        propertiesHeader.setContentEncoding("UTF-8");


        Message messageHeaders = new Message(sendSMSHeader.getBytes(),propertiesHeader);

        rabbitTemplate.send("","队列spring-监听器-红米",messageHeaders);
    }
16、添加json支持-转换普通对象为JSON字符串并发送到MQ

在pom.xml中需要添加jackson的引用,才能够实现把自定义的对象转换为json字符串

<!--json支持的依赖-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

实体类代码:

package com.study.rabbitmq.p10TestSpringListenerAdaper;

/**
 * @author 武帅
 * @date 2020/6/7 16:56
 * @description
 */
public class MyOrder {

    private Integer id;
    private String name;
    private float price;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "MyOrder{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", price=" + price +
                '}';
    }
}

配置类代码(暂时先注释一下)

   //使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();

        //设置消息的转换器
        messageListenerAdapter.setMessageConverter(new MyMessageTestConverter());

        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //使用自己的类型转换器
        messageListenerAdapter.setDefaultListenerMethod("myHandleMessage");

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

       return simpleMessageListenerContainer;
    }

测试类代码:

    @Test
    void testSendOrderToXiaoMi() throws JsonProcessingException {


        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        //变成json字符串
        properties.setContentType("application/json");

        MyOrder myOrder = new MyOrder();
        myOrder.setId(1);
        myOrder.setName("一部最新的小米手机");
        myOrder.setPrice(1999);

        //把普通对象变成JSON格式的字符串
        ObjectMapper objectMapper = new ObjectMapper();
        String jsonStr = objectMapper.writeValueAsString(myOrder);

        System.out.println( "====================" + jsonStr);

        Message message = new Message(jsonStr.getBytes(), properties);
        rabbitTemplate.send("","队列spring-监听器-小米",message);

    }
17、Jackson2JsonMessageConverter接收并处理Map类型的消息

配置类代码:

  //使用监听适配器,针对不同队列的消息使用不同的方法,进行处理
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();


        //Jackson2JsonMessageConverter接收并处理Map类型的消息
        Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();

        //设置消息的转换器
        messageListenerAdapter.setMessageConverter(jsonMessageConverter);

        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //使用自己的类型转换器 默认使用反射,寻找方法名
        messageListenerAdapter.setDefaultListenerMethod("myHandleMap");

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

处理方法的参数为转换过的Map类型

// 处理方法的参数为转换过的Map类型
    public void myHandleMap(Map map){
        System.out.println( "处理方法的参数为转换过的Map类型" + map);
    }

测试类代码:

   @Test
    void testSendOrderToXiaoMi() throws JsonProcessingException {


        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        //变成json字符串
        properties.setContentType("application/json");

        MyOrder myOrder = new MyOrder();
        myOrder.setId(1);
        myOrder.setName("一部最新的小米手机");
        myOrder.setPrice(1999);

        //把普通对象变成JSON格式的字符串
        ObjectMapper objectMapper = new ObjectMapper();
        String jsonStr = objectMapper.writeValueAsString(myOrder);

        //System.out.println( "====================" + jsonStr);

        Message message = new Message(jsonStr.getBytes(), properties);
        rabbitTemplate.send("","队列spring-监听器-小米",message);

    }
18、使用JavaTypeMapper直接接收并处理Java对象

配置类代码:

//使用监听适配器,直接处理传过来的Java对象
       @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();


        //Jackson2JsonMessageConverter接收并处理Map类型的消息
        Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();

        DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();

        //信任所有的包 进行反射
        defaultJackson2JavaTypeMapper.setTrustedPackages("*");

        //进行映射
        jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);


        //设置消息的转换器
        messageListenerAdapter.setMessageConverter(jsonMessageConverter);

        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //使用自己的类型转换器 默认使用反射,寻找方法名
        messageListenerAdapter.setDefaultListenerMethod("myHandleJavaObject");

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

​ 处理方法的参数为转换过的Java类型代码:

public void myHandleJavaObject(MyOrder myOrder){
    System.out.println( "处理方法的参数为转换过的Java类型" + myOrder);
}

测试类代码:

 @Test
    void testSendJavaClassObjectToXiaoMi() throws JsonProcessingException {


        MessageProperties properties = new MessageProperties();
        properties.setContentEncoding("UTF-8");
        //变成json字符串
        properties.setContentType("application/json");
        //把Java的类对象转换为Json在转换为字节数组
        properties.getHeaders().put("__TypeId__", "com.study.rabbitmq.p10TestSpringListenerAdaper.MyOrder");


        MyOrder myOrder = new MyOrder();
        myOrder.setId(1);
        myOrder.setName("一部最新的小米手机-Java对象");
        myOrder.setPrice(1999);

        //把普通对象变成JSON格式的字符串
        ObjectMapper objectMapper = new ObjectMapper();
        String jsonStr = objectMapper.writeValueAsString(myOrder);

        //System.out.println( "====================" + jsonStr);

        Message message = new Message(jsonStr.getBytes(), properties);
        rabbitTemplate.send("","队列spring-监听器-小米",message);

    }
19、全局转换器-处理多种类型的消息-1

根据内容类型自定义转换器,需要使用全局消息转换器

ContentTypeDelegatingMessageConverter

配置类代码:

//使用全局转换器,对不同类型的数据进行定向转换
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,Queue queueSMS){
        //把工厂传进来
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        //设置多个监听队列
        simpleMessageListenerContainer.setQueues(queueSMS,queueXiaoMi(),queueHongMi());

        //1个监听器,可以监听多少个队列 (一般设置为1,不能设置为0)
        simpleMessageListenerContainer.setConcurrentConsumers(1);

        //创建多少个监听器,来监听消息队列
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);

        //设置收到消息后的确认模式  AcknowledgeMode.AUTO 表示对消息进行自动确认
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

        //设置标签,然后台更加的认识它们
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy(){

            @Override
            public String createConsumerTag(String consumerTag) {

                //自己手写改动过后的
                consumerTag = consumerTag + "-" + num++;
                return consumerTag;

                //默认返回的就是,rabbitMq自动给咱们起好的名字
                //return consumerTag;
            }
        });

        //使用MessageListenerAdapter类进行消息接收并处理
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();


        //Jackson2JsonMessageConverter接收并处理Map类型的消息
        Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();

        //创建全局转换器,对不同类型的数据进行定向转换
        ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
        //根据不同的内容创建不同的转换器
        converter.addDelegate("my-jpg", new MyMessageJPGConverter());
        converter.addDelegate("my-pdf", new MyMessagePDFConverter());

        //设置消息的转换器
        messageListenerAdapter.setMessageConverter(converter);

        //new MyMessageDelegate() 这里使用自己的方法
        messageListenerAdapter.setDelegate(new MyMessageDelegate());

        //使用自己的类型转换器 默认使用反射,寻找方法名
        messageListenerAdapter.setDefaultListenerMethod("myHandleFile");

        //进行绑定
        simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

        return simpleMessageListenerContainer;
    }

处理方法的参数为转换过的File类型的代码

    // 处理方法的参数为转换过的File类型
    public void myHandleFile(File file){
        System.out.println( "处理方法的参数为转换过的File类型" + file.getName());
    }

读取文件的二进制流发送到中间件的代码:

//读取文件的二进制流发送到中间件
    @Test
    void testSendFileByteToXiaoMi() throws Exception {

        //处理图片
        MessageProperties properties = new MessageProperties();
        //设置自己的类型
        properties.setContentType("my-jpg");
        properties.getHeaders().put("extName","jpg");

        //读取文件的二进制
        File file = new File("E:\\其他文件\\SiKi\\资料RabbitMQ\\dog.jpg");

        byte[] bytes = Files.readAllBytes(file.toPath());

        Message message = new Message(bytes, properties);
        rabbitTemplate.send("","队列spring-监听器-小米",message);


        //处理PDF文件
        MessageProperties propertiesPDF = new MessageProperties();
        //设置自己的类型
        propertiesPDF.setContentType("my-pdf");
        propertiesPDF.getHeaders().put("extName","pdf");

        //读取文件的二进制
        File filePDF = new File("E:\\其他文件\\SiKi\\资料RabbitMQ\\snake.pdf");

        byte[] bytesPDF = Files.readAllBytes(filePDF.toPath());

        Message messagePDF = new Message(bytesPDF, propertiesPDF);
        rabbitTemplate.send("","队列spring-监听器-小米",messagePDF);

    }

​ 根据不同的内容创建不同的转换器的代码

    package com.study.rabbitmq.p10TestSpringListenerAdaper;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;

/**
 * @author 武帅
 * @date 2020/6/7 16:18
 * @description 自定义图片转换器
 *      MessageConverter自定义接收数据类型
 *
 *      类型转换器(实现接口MessageConverter)
 */
public class MyMessagePDFConverter implements MessageConverter {


    // java对象转换为Message对象
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        Message message = new Message(o.toString().getBytes(),messageProperties);
        return message;
    }



    @Override
    public Object fromMessage(Message message) {

        //获取扩展名
        String extName = message.getMessageProperties().getHeaders().get("extName").toString();

        //做文件路径的拼接
        String path = "E:/" + UUID.randomUUID().toString() + "." + extName;

        //进行文件的读写
        File file = new File(path);

        try {
            Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;
    }
}

根据不同的内容创建不同的转换器的代码

package com.study.rabbitmq.p10TestSpringListenerAdaper;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;

/**
 * @author 武帅
 * @date 2020/6/7 16:18
 * @description 自定义PDF转换器
 *      MessageConverter自定义接收数据类型
 *
 *      类型转换器(实现接口MessageConverter)
 */
public class MyMessagePDFConverter implements MessageConverter {


    // java对象转换为Message对象
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        Message message = new Message(o.toString().getBytes(),messageProperties);
        return message;
    }



    @Override
    public Object fromMessage(Message message) {

        //获取扩展名
        String extName = message.getMessageProperties().getHeaders().get("extName").toString();

        //做文件路径的拼接
        String path = "E:/" + UUID.randomUUID().toString() + "." + extName;

        //进行文件的读写
        File file = new File(path);

        try {
            Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;
    }
}
20、使用SpringBoot的配置文件和注解进行收发消息 (这里类似于SpringBoot整合RabbitMq)

​ 配置Application.properties

​ 配置后,就可以直接使用RabbitTemplate的实力化bean了,连自己实例化bean都省了

第一步:

​ 创建两个工程(这个就不多说了,很简单啊)

第二步:

​ 使用配置文件连接消息中间件

#SpringBoot连接rabbitmq的信息

#SpringBoot连接rabbitmq的地址 如果是本机则写localhost 如果是别的机器,则写它的IP地址
spring.rabbitmq.addresses=localhost

#SpringBoot连接rabbitmq的用户名 默认是guest
spring.rabbitmq.username=guest

#SpringBoot连接rabbitmq的密码 默认是guest
spring.rabbitmq.password=guest

#SpringBoot连接rabbitmq的虚拟机 /表示根目录
spring.rabbitmq.virtual-host=/

#SpringBoot连接rabbitmq的超时时间 以毫秒为单位 这里先设为10秒(如果10秒内还没有连接到rabbitmq的话,那就退出连接)
spring.rabbitmq.connection-timeout=10000
21、注入模版对象发送消息-携带相关数据ID (以下所有的标题和代码内容都和上一个内容有关)

​ 这里的上一个内容为20标题

​ 创建配置类的代码:

package com.study.rabbitmq;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * @author 武帅
 * @date 2020/6/9 15:56
 * @description 生产者的配置类
 *      @Configuration  添加了此注解 则表示这个类为配置类(用于实例化需要的Bean)
 *      @ComponentScan({"com.study.rabbitmq.*"})  添加了此注解 则表示要扫描这个包下所有的组件类,并且实例化
 */
@Configuration
@ComponentScan({"com.study.rabbitmq.*"})
public class MyConfigProducer {


}

创建生产者类的代码:

package com.study.rabbitmq;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

/**
 * @author 武帅
 * @date 2020/6/9 16:01
 * @description  @Component 添加此注解 则表示自动实例化,变成组件类
 */
@Component
public class MySendComponent {

    //Spring整合的RabbitMq  Spring对它进行了优化,而创造出来的类RabbitTemplate 是个模板
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * @Author: 武帅
     * @Date: 2020-06-09
     * @Description:
     * @param object 表示 需要发送的实际内容
     * @param map    表示 需要携带的特殊信息
     *@return: void
     */
    public void send(Object object, Map<String,Object> map){

        //这里导入的包是org.springframework.messaging.MessageHeaders
        MessageHeaders messageHeaders = new MessageHeaders(map);

        //这里导入的包是org.springframework.messaging.Message
        Message message = MessageBuilder.createMessage(object,messageHeaders);

        //创建唯一的UUID
        String uuId = UUID.randomUUID().toString();

        //需要传入一个字符串 保证全局唯一
        //作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        CorrelationData correlationData = new CorrelationData(uuId);

        //发送消息
        //第一个参数为交换机的名字
        //第一个参数为交换机的路由键
        //第三个参数为要发送的数据
        //第四个参数为要携带相关数据的ID 用于:如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        rabbitTemplate.convertAndSend("交换机spring短信","发送短信",message,correlationData);

        //应该还有一步 就是发送完短信,因该把UUID插入到数据库中。
    }


}
22、确认模式

发送消息代码:

package com.study.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;


@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private MySendComponent mySendComponent;


    @Test
    void test() {
        System.out.println("hello");
    }

    @Test
    void sendStringToMQ(){

        //需要发送的数据
        String str = "hello world";

        //需要携带的特殊信息
        Map<String,Object> map = new HashMap<>();
        map.put("收货人姓名","Jack");
        map.put("收货人地址","北京");
        map.put("收货人电话","110");

        mySendComponent.send(str,map);
    }




}

往application.properties文件中添加了其它的配置文件代码:

#SpringBoot整合rabbitmq的确认模式 默认是none
spring.rabbitmq.publisher-confirm-type=correlated

#SpringBoot整合rabbitmq的返回模式 默认是false
spring.rabbitmq.publisher-returns=true

#默认是false 则表示不通知,直接扔掉, 如果是true的话,则表示投递出错,通知生产者
spring.rabbitmq.template.mandatory=true

往MySendComponent文件中添加了其它的代码:

package com.study.rabbitmq;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

/**
 * @author 武帅
 * @date 2020/6/9 16:01
 * @description  @Component 添加此注解 则表示自动实例化,变成组件类
 */
@Component
public class MySendComponent {

    //Spring整合的RabbitMq  Spring对它进行了优化,而创造出来的类RabbitTemplate 是个模板
    @Autowired
    RabbitTemplate rabbitTemplate;

    //回调函数
    RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack == false){
                System.out.println("出错了,发送了Mq出现了问题,此时因该更新数据库,失败的原因:" + cause
                + "错误的ID为:" + correlationData);
            }else{
                System.out.println("发送成功,原因为:" + cause + "Id为:" + correlationData);
            }
        }
    };

    /**
     * @Author: 武帅
     * @Date: 2020-06-09
     * @Description:
     * @param object 表示 需要发送的实际内容
     * @param map    表示 需要携带的特殊信息
     *@return: void
     */
    public void send(Object object, Map<String,Object> map){

        //这里导入的包是org.springframework.messaging.MessageHeaders
        MessageHeaders messageHeaders = new MessageHeaders(map);

        //这里导入的包是org.springframework.messaging.Message
        Message message = MessageBuilder.createMessage(object,messageHeaders);

        //创建唯一的UUID
        String uuId = UUID.randomUUID().toString();

        //需要传入一个字符串 保证全局唯一
        //作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        CorrelationData correlationData = new CorrelationData(uuId);


        //回调函数 如果出错,则调用此函数
        rabbitTemplate.setConfirmCallback(confirmCallback);


        //发送消息
        //第一个参数为交换机的名字
        //第一个参数为交换机的路由键
        //第三个参数为要发送的数据 这里暂时没有数据,数据由外边的类来提供
        //第四个参数为要携带相关数据的ID 用于:如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        rabbitTemplate.convertAndSend("交换机spring短信","发送短信",message,correlationData);

        //应该还有一步 就是发送完短信,因该把UUID插入到数据库中。
    }


}

23、返回模式

​ 发送消息代码: 和上面确认模式的一样

​ application.properties文件: 和上面确认模式的一样

​ 往MySendComponent文件中添加了其它的代码:

package com.study.rabbitmq;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

/**
 * @author 武帅
 * @date 2020/6/9 16:01
 * @description  @Component 添加此注解 则表示自动实例化,变成组件类
 */
@Component
public class MySendComponent {

    //Spring整合的RabbitMq  Spring对它进行了优化,而创造出来的类RabbitTemplate 是个模板
    @Autowired
    RabbitTemplate rabbitTemplate;

    //回调函数
    RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack == true){
                System.out.println("出错了,发送了Mq出现了问题,此时因该更新数据库,失败的原因:" + cause
                        + "错误的ID为:" + correlationData);
            }else{
                System.out.println("发送成功,原因为:" + cause + "Id为:" + correlationData);
            }
        }
    };


    RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("replyCode :" + replyCode );
            System.out.println("replyText :" + replyText );
            System.out.println("exchange :" + exchange );
            System.out.println("routingKey :" + routingKey );
            System.out.println("message :" + message );
        }
    };


    /**
     * @Author: 武帅
     * @Date: 2020-06-09
     * @Description:
     * @param object 表示 需要发送的实际内容
     * @param map    表示 需要携带的特殊信息
     *@return: void
     */
    public void send(Object object, Map<String,Object> map){

        //这里导入的包是org.springframework.messaging.MessageHeaders
        MessageHeaders messageHeaders = new MessageHeaders(map);

        //这里导入的包是org.springframework.messaging.Message
        Message message = MessageBuilder.createMessage(object,messageHeaders);

        //创建唯一的UUID
        String uuId = UUID.randomUUID().toString();

        //需要传入一个字符串 保证全局唯一
        //作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        CorrelationData correlationData = new CorrelationData(uuId);


        //回调函数 如果出错,则调用此函数
        rabbitTemplate.setConfirmCallback(confirmCallback);

        //返回模式
        rabbitTemplate.setReturnCallback(returnCallback);

        //发送消息
        //第一个参数为交换机的名字
        //第一个参数为交换机的路由键
        //第三个参数为要发送的数据 这里暂时没有数据,数据由外边的类来提供
        //第四个参数为要携带相关数据的ID 用于:如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        rabbitTemplate.convertAndSend("交换机spring短信","发送短信",message,correlationData);

        //应该还有一步 就是发送完短信,因该把UUID插入到数据库中。
    }


}

24、使用RabbitListener注解消费数据并手动确认

@RabbitListener是一个组合注解,可以组合其它注解一起来完成对队列的消息接收和监听,主要包括@QueueBinding、@Queue、@Exchange

此时是在rabbitmqConsumer工程里写代码

application.properties文件的代码

#SpringBoot连接rabbitmq的信息

#SpringBoot连接rabbitmq的地址 如果是本机则写localhost 如果是别的机器,则写它的IP地址
spring.rabbitmq.addresses=localhost

#SpringBoot连接rabbitmq的用户名 默认是guest
spring.rabbitmq.username=guest

#SpringBoot连接rabbitmq的密码 默认是guest
spring.rabbitmq.password=guest

#SpringBoot连接rabbitmq的虚拟机 /表示根目录
spring.rabbitmq.virtual-host=/

#SpringBoot连接rabbitmq的超时时间 以毫秒为单位 这里先设为10秒(如果10秒内还没有连接到rabbitmq的话,那就退出连接)
spring.rabbitmq.connection-timeout=10000

#rabbitmq的手动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual

#rabbitmq的监听对象的数量
spring.rabbitmq.listener.simple.concurrency=1

#SpringBoot为我们创建5个对象进行同时监听
spring.rabbitmq.listener.simple.max-concurrency=5



配置类的代码:

package com.study.rabbitmq;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * @author 武帅
 * @date 2020/6/11 11:03
 * @description @ComponentScan({"com.study.rabbitmq.*"}) 表示扫描包下所有的类
 */
@Configuration
@ComponentScan({"com.study.rabbitmq.*"})
public class MyConfigConsumer {



}

组件类的代码

package com.study.rabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 武帅
 * @date 2020/6/11 11:05
 * @description     @Component 表示这是一个组件类.会被自动的实例化,进行一个注入
 */
@Component
public class MyRecvComponent {

    //需要引入 import org.springframework.messaging.Message;
    //需要引入 import com.rabbitmq.client.Channel;
    @RabbitListener(
        //queues = {"队列spring短信"} 监听的队列
        queues = {"队列spring短信"}
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel){
        System.out.println("收到了: " + message.getPayload());

        //获取消息唯一Id
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

        //因为现在是手动确认模式,所以要设置basicAck
        //第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
        //第二个参数 不自动批量
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
25、使用RabbitListener复合注解 创建队列交换机和绑定

application.properties文件的代码 和上面的一样

配置类的代码: 和上面的一样

组件类的代码:

package com.study.rabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 武帅
 * @date 2020/6/11 11:05
 * @description     @Component 表示这是一个组件类.会被自动的实例化,进行一个注入
 */
@Component
public class MyRecvComponent {

    //需要引入 import org.springframework.messaging.Message;
    //需要引入 import com.rabbitmq.client.Channel;
    @RabbitListener(
        //queues = {"队列spring短信"} 监听的队列
        queues = {"队列spring短信"}
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel){
        System.out.println("收到了: " + message.getPayload());

        //获取消息唯一Id
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

        //因为现在是手动确认模式,所以要设置basicAck
        //第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
        //第二个参数 不自动批量
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }



    // =========================================================================================

    //需要引入 import org.springframework.messaging.Message;
    //需要引入 import com.rabbitmq.client.Channel;
    //使用复合注解创建队列
    @RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "队列-注解创建的",declare = "true",exclusive = "false",autoDelete = "false"),
                exchange = @Exchange(value = "交换机-注解创建的",durable = "true", type = "direct"),
                key = "新路由键"
        )
    )
    @RabbitHandler
    public void onMessageNewQueue(Message message, Channel channel){
        System.out.println("NewQue收到了: " + message.getPayload());

        //获取消息唯一Id
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

        //因为现在是手动确认模式,所以要设置basicAck
        //第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
        //第二个参数 不自动批量
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

26、使用注解直接发送和接收java自定义对象

​ 要求:实体类必须实现接口 Serializable

​ 实体类最好定义 序列号ID

1、首先在生产端定义一个实体类

package com.study.rabbitmq;

import java.io.Serializable;

/**
 * @author 武帅
 * @date 2020/6/12 15:35
 * @description
 */
public class MyUser implements Serializable {

    //随便给一个UID即可
    private static final long serialVersionUID = 870292457463066666L;

    private String userName;
    private Integer age;
    private long exd;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public long getExd() {
        return exd;
    }

    public void setExd(long exd) {
        this.exd = exd;
    }

    public MyUser() { }

    public MyUser(String userName, Integer age, long exd) {
        this.userName = userName;
        this.age = age;
        this.exd = exd;
    }

}

2、然后再组件类里添加其余的代码:

public void sendMyUser(MyUser user){
        //回调函数 如果出错,则调用此函数
        rabbitTemplate.setConfirmCallback(confirmCallback);

        //返回模式
        rabbitTemplate.setReturnCallback(returnCallback);

        //创建唯一的UUID
        String uuId = UUID.randomUUID().toString();

        //需要传入一个字符串 保证全局唯一
        //作用为: 如果消息没有发送成功,可以把这个Id给你返回回来,让你快速的查询到是哪条消息出错了。
        CorrelationData correlationData = new CorrelationData(uuId);

        rabbitTemplate.convertAndSend("交换机-注解创建的","新路由键",user,correlationData);

    }

3、测试类里添加其余的代码:

 @Test
    void SendUserToMQ(){
        MyUser user = new MyUser("张三",18,2000L);
        mySendComponent.sendMyUser(user);
    }

4、在切换为消费端

5、在消费端直接粘贴生产端的实体类

6、在组件类里添加其余的代码:

//需要引入 import org.springframework.messaging.Message;
    //需要引入 import com.rabbitmq.client.Channel;
    @RabbitListener(
            //queues = {"队列spring短信"} 监听的队列
            queues = {"队列-注解创建的"}
    )
    @RabbitHandler
    public void onMessageUser(@Payload MyUser user, Channel channel, @Headers Map<String,Object> headers){
        System.out.println("收到了: " + user.getUserName());
        System.out.println("收到了: " + user.getAge());
        System.out.println("收到了: " + user.getExd());

        //获取消息唯一Id
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

        //因为现在是手动确认模式,所以要设置basicAck
        //第一个参数 包裹的一个标签 是RabbitMq为我们自动添加的唯一Id
        //第二个参数 不自动批量
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

好了到此RabbitMQ的基础部分就结束了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容