SpringBoot 之八:异步消息处理

  很多时候,我们不需要实时地处理消息,可以把事情放一放再处理,比如工作比较忙碌无法实时处理的,可以通过邮件通知,邮件接收方有空时再去看看邮箱里有什么事情要处理,这解耦了消息发送方和接收方的通信,使得忙碌一方可以从容不迫处理事情。
  在系统架构上,消息中间件提供了类似的功能,常用的 MQ 有使用 JMS 的 ActiveMQ、使用 AMQP 协议的 RabbitMQ、吞吐能力惊人的开源的 Kafka、阿里开源 RocketMQ。

1、使用 JMS

(1)搭建 Active Artemis 环境

Ⅰ、安装

  安装包下载(找到与自己使用的客户端匹配的版本):ActiveMQ (apache.org)
  将 apache-artemis-xxx-bin.zip 解压到安装目录,设置环境变量 ARTEMIS_HOME ,值为解压目录。

  添加执行目录到 PATH 环境参数中



Ⅱ、创建 broker 实例

  使用以下命令创建 broker(并设定用户密码):

artemis create brokername

  在当前命令行目录下会创建一个对应的目录


Ⅲ、启停 broker

  切换到生成的 broker 目录下的 bin 子目录,执行以下命令即可启动该 broker:

artemis run

  启动后访问控制台

http://localhost:8161/console

  使用一开始创建 broker 时设定的账号密码登录,登入后的控制台菜单和页面如下

  停止 broker

artemis stop


Ⅳ、发送接收消息

  在${ARTEMIS_HOME}\examples目录下,自带了非常多的使用示例,可以选取其中的示例来验证发送消息、接收消息功能。

  比如,选取了“examples\features\standard\queue”这个例子,在Broker实例启动的情况下,执行以下命令来运行示例

mvn -PnoServer verify

  或者在没有Broker实例启动的情况下,执行以下命令来运行示例

mvn verify

  运行完成之后,可以看到控制台输出内容如下:

  可以看到示例程序已经能够成功发送和接收到消息“This is a text message”了。

(2)JmsTemplate

  首先要引入 JMS Starter 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

  引入该依赖后,SpringBoot 会在启动时自动配置一个 JmsTemplate,可以将它注入到程序中,并用它来发送和接收消息。

  JmsTemplate 是 Spring JMS 集成支持的核心。与 Spring 的其他面向模板的组件非常相似,JmsTemplate 消除了大量与 JMS 协同工作所需的样板代码。如果没有 JmsTemplate,将需要编写代码来创建与消息代理的连接和会话,并编写更多代码来处理在发送消息过程中可能抛出的任何异常。JmsTemplate 专注于真正想做的事情:发送消息。

Ⅰ、发送消息

  JmsTemplate 有几个发送消息的有用方法,包括:JmsTemplate 有几个发送消息的有用方法,包括:

// 发送原始消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
// 发送转换自对象的消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
// 发送经过处理后从对象转换而来的消息
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;

  实际上只有两个方法,send() 和 convertAndSend(),每个方法都被重载以支持不同的参数。如果仔细观察,会发现 convertAndSend() 的各种形式可以分为两个子类。在试图理解所有这些方法的作用时,请考虑以下细分:

  • send() 方法需要一个 MessageCreator 来制造一个 Message 对象。
  • convertAndSend() 方法接受一个 Object,并在后台自动将该 Object 转换为一条 Message。
  • 三种 convertAndSend() 方法会自动将一个 Object 转换成一条 Message,但也会接受一个 MessagePostProcessor,以便在 Message 发送前对其进行定制。

  此外,这三个方法类别中的每一个都由三个重载的方法组成,它们是通过指定 JMS 目的地(队列或主题)的方式来区分的:

  • 一个方法不接受目的地参数,并将消息发送到默认目的地。
  • 一个方法接受指定消息目的地的目标对象。
  • 一个方法接受一个 String,该 String 通过名称指定消息的目的地。

  使用 send() 方法发送消息:

private JmsTemplate jms;

private void useSend(Order order) {
    // 写法1:使用MessageCreator的匿名内部类写法
    /*jms.send(new MessageCreator() {           
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createObjectMessage(order);
        }
    });*/
    
    // 写法2:函数式接口匿名内部类的简化写法,lambda表达式,默认为 spring.jms.template.default-destination 参数设置的队列
    jms.send(session -> session.createObjectMessage(order));
    
    // 写法3:指定队列(使用Destination、直接使用字符串)
    jms.send(orderQueue,
             session -> session.createObjectMessage(order));
    jms.send("tacocloud.order.queue",
             session -> session.createObjectMessage(order));    
    
    jms.send("tacocloud.order.queue", new MessageCreator() {            
        @Override
        public Message createMessage(Session session) throws JMSException {
            Message message = session.createObjectMessage(order);
            message.setStringProperty("X_ORDER_SOURCE", "WEB");
            return message;
        }
    }); 
    jms.send("tacocloud.order.queue", session -> {
            Message message = session.createObjectMessage(order);
            message.setStringProperty("X_ORDER_SOURCE", "WEB");
            return message;         
    });     
                
}

  使用 useConvertAndSend() 方法发送消息:

private void useConvertAndSend(Order order) {
    jms.convertAndSend(order);
    jms.convertAndSend(orderQueue, order);
    jms.convertAndSend("tacocloud.order.queue", order);         
    // 在发送数据之前修改底层的Message对象
    jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {         
        @Override
        public Message postProcessMessage(Message message) throws JMSException {
            message.setStringProperty("X_ORDER_SOURCE", "WEB");
            return message;
        }
    });
    
    // lambda
    jms.convertAndSend("tacocloud.order.queue", order, message -> {
            message.setStringProperty("X_ORDER_SOURCE", "WEB");
            return message;         
    });     
    
    // 方法引用
    jms.convertAndSend("tacocloud.order.queue", order, this::addOrderSource);           
}

private Message addOrderSource(Message message) throws JMSException {
    message.setStringProperty("X_ORDER_SOURCE", "WEB");
    return message;             
}

  编写 Jms 服务

package tacos.messaging;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.stereotype.Service;

import tacos.Order;

@Service
public class JmsOrderMessagingService implements OrderMessagingService {

    private JmsTemplate jms;
    private Destination orderQueue;
    
    @Autowired
    public JmsOrderMessagingService(JmsTemplate jms, Destination orderQueue) {      
        this.jms = jms;
        this.orderQueue = orderQueue;
    }

    @Override
    public void sendOrder(Order order) {
        jms.convertAndSend("tacocloud.order.queue", order, this::addOrderSource);   
    }
}

  编写使用 Jms 服务的控制器:

package tacos.messaging;

import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@RestController
@RequestMapping("/jms")
@Slf4j
public class JmsController {
    
    @Autowired
    JmsOrderMessagingService jmsService;

    @GetMapping("/convertAndSend/order")
    public String convertAndSendOrder() {
        log.info("----------------------- SEND ORDER -------------------------");
        Order order = buildOrder();
        log.info("Ready to send order: " + order);
        jmsService.sendOrder(order);
        log.info("Send order success!");
        return "Convert and send order";
    }

    private Order buildOrder() {
        Order order = new Order();
        order.setId(1L);
        order.setUserId(1L);
        order.setDeliveryName("huyihao");
        order.setDeliveryState("广东省");          
        order.setDeliveryCity("广州市");
        order.setDeliveryStreet("荔湾区石围塘街道");
        order.setDeliveryZip("515100");
        order.setCcNumber("6226890243871067");
        order.setCcExpiration("12/23");
        order.setCcCvv("282");
        order.setPlacedAt(new Date());
        
        return order;
    }
}

  发起请求测试:


Ⅱ、接收消息

  在消息消费的时候,可以选择拉取模式(pull mode)推送模式(push mode),前者会在我们的代码中请求消息并一直等待直到消息到达为止,而后者则会在消息可用的时候自动在代码中执行。

  JmsTemplate 提供了几种接收消息的方法,但它们都使用拉模型。调用其中一个方法来请求消息,线程会发生阻塞,直到消息可用为止(可能是立即可用,也可能需要一段时间)。

  另一方面,还可以选择使用推模型,在该模型中,定义了一个消息监听器,它在消息可用时被调用。

  这两个选项都适用于各种用例。人们普遍认为推模型是最佳选择,因为它不会阻塞线程。但是在某些用例中,如果消息到达得太快,侦听器可能会负担过重。拉模型允许使用者声明他们已经准备好处理新消息。

  JmsTemplate 提供多个用于拉模式的方法,包括以下这些:

Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;

Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;

  可以看到,这 6 个方法是 JmsTemplate 中的 send() 和 convertAndSend() 方法的镜像。receive() 方法接收原始消息,而 receiveAndConvert() 方法使用配置的消息转换器将消息转换为域类型。对于其中的每一个,可以指定 Destination 或包含目的地名称的 String,也可以从缺省目的地获取一条消息。

  定义一个订单接收接口:

package tacos.messaging;

import javax.jms.JMSException;

import org.springframework.jms.support.converter.MessageConversionException;

import tacos.Order;

public interface OrderReceiver {
    public Order receiveOrder() throws MessageConversionException, JMSException ;
}

  定义一个实现类并注册为Spring服务:

package tacos.messaging;

import javax.jms.JMSException;
import javax.jms.Message;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@Service
@Slf4j
public class JmsOrderReceiver implements OrderReceiver {
    private JmsTemplate jms;
    private MessageConverter converter;
    
    public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {      
        this.jms = jms;
        this.converter = converter;
    }

    @Override
    public Order receiveOrder() throws MessageConversionException, JMSException {
        Message message = jms.receive("tacocloud.order.queue");
        String source = message.getStringProperty("X_ORDER_SOURCE");
        log.info("This order is from " + source);
        return (Order) converter.fromMessage(message);
    }
}

  在控制器中新增响应请求方法:

@GetMapping("/receiveAndConvert/order")
public Order receiveAndConvertOrder() {
    log.info("----------------------- RECEIVE ORDER -------------------------");
    Order order = null;
    try {
        order = jmsReceiver.receiveOrder();
    } catch (MessageConversionException e) {
        log.error("Convert message exception", e);
        e.printStackTrace();
    } catch (JMSException e) {
        log.error("Receive message exception", e);
        e.printStackTrace();
    }
    log.info("Send order success!");
    return order;
}

  测试:

  消息监听器是推送模式接收消息,在消息到达之前,它会一直处于空闲状态,消息到达时接收消息处理。

  要定义一个监听器需要使用 @JmsListener 注解,下面定义一个 Spring 组件,并且定义一个使用了该注解的接收订单的方法:

package tacos.messaging;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@Component
@Slf4j
public class OrderListener {

    @JmsListener(destination = "tacocloud.order.queue")
    public void receiveOrder(Order order) {
        log.info("----------------------- LISTENER RECEIVE ORDER -------------------------");
        log.info("Reveive a order: " + order);
    }
}

  监听器接收到消息时会打印响应的日志。

  发一笔对 /jms/convertAndSend/order 的请求,以触发消息发送,若监听器生效,会自动接收消息和打印日志。

  可以看到监听器马上打印出了相应的日志,证明起效果了。


2、使用 RabbitMQ

(1)AMQP 协议

  RabbitMQ 使用 AMQP 协议,AMQP协议规定了路由消息必须有三个组成部分:交换器、绑定、队列,如下图所示:

  生产者将消息发布到交换器,交换器根据绑定规则,将消息分发到对应的队列,消息出队后被消费者消费。

  交换器是一个虚拟的用来接收、投递消息的中间构件。生产者将消息发布到交换器上,并声明绑定的路由规则,然后交换器根据绑定路由规则,将消息投递到满足绑定路由规则的队列上。

  举个例子,你写信给女朋友Alice,信件上写明要投递到某个地址上面去,然后将信件投递到邮箱,接着邮递员从邮箱中取邮件,根据你填写的地址将信件发到Alice的家中,这里邮箱、快递员、信件上的地址就组成了交换器的效果,邮箱负责接收信件、快递员负责发信件、信件地址是投递信件的规则。

  当然,绑定规则跟信件地址并不是完全相等的关系,参考上面的“发后即忘”模型图。

  RabbitMQ中有三种常用的交换器,分别是direct、fanout、topic,还有一种headers交换器,允许匹配AMQP消息的header而非路由键,但其性能差因此几乎不会用到。

direct交换器

  direct交换器是直接路由键匹配,如图所示:

  RabbitMQ默认实现了一个名字为空字符串的direct交换器,当声明一个队列时,默认会绑定到这个默认的交换器。



fanout 交换器

  fanout交换器会将收到的消息广播到绑定的队列上,比如应用程序中A模块完成之后必须触发B、C、D模块,并且B、C、D之前是并发的没有程序耦合关系,则适合对B、C、D分别声明一条队列来实现与A模块的关联,通过MQ,A模块无需显式地去调用BCD,实现了程序的解耦,这也是MQ存在的意义之一。



topic 交换器

  topic交换器是RabbitMQ中最灵活的交换器,由于队列支持对绑定关系进行模糊匹配,因此能够实现多种复杂的应用场景需求,下面以日志搜集为例,实现对不同日志级别维度的日志进行收集,如图所示:



(2)环境搭建

Ⅰ、安装Erlang

  RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang,下载地址:https://www.erlang.org/downloads

  双击 otp_win64_25.3.2.exe 直接安装即可。

  设置环境变量

  验证环境



Ⅱ、安装RabbitMQ

  下载地址:https://www.rabbitmq.com/install-windows.html,根据使用操作系统下载对应的安装包,windows是对应 rabbitmq-server-3.11.15.exe 文件,双击该可执行文件,按照默认配置完成安装即可。

  安装Web网页管理插件 RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及exchange的工作情况。

  程序安装好之后默认情况下服务是开启的,这一点可以打开Windows的服务界面查看。

  打开开始菜单,选择RabbitMQ Server->RabbitMQ Service-stop命令,停止服务。

  此时再次打开Windows的服务管理界面,可以看见RabbitMQ服务已停止。

  打开 CMD 窗口,切换到安装目录下的 sbin 目录下

  输入命令:

rabbitmq-plugins enable rabbitmq_management

  安装完插件,开启服务,打开菜单,RabbitMQ Service Start

  成功开启服务之后,同样打开服务管理界面,查看是否已开启服务

  打开浏览器,访问:

http://localhost:15672/

  进入控制台登录页面,默认账户密码为:guest/guest

  登录成功之后,如下图所示:

  选择Queues页面,打开“Add a new queue”,为队列命名,如“MQ_Test”,其它选择可以默认,然后点击“Add queue”按钮,

  添加成功之后,在“All queues”选项下面会列出刚才创建的队列信息,

  在“All queues”列表中选择刚刚创建的队列“MQ_Test”,会显示“MQ_Test”的信息页面,主要显示了该队列的网络状态以及速率监控等,然后选择“Publish message”,在“Payload”中输入”Hello world!!!”,

  然后点击“Publish message”按钮,就可以发送消息,发送完之后在“Overview”中显示了实时的网络状态。

  然后选择“Get messages”下拉框,会弹出接收消息的显示界面,点击“Get message”按钮,接收到来自服务器的消息,也就是刚刚发送的“Hello world!!!”

  到此,可以说明RabbitMQ的安装均已正常,进一步操作请参考官网的操作文档:http://www.rabbitmq.com/documentation.html

(3)配置 Spring Rabbit

  引入对应的 Starter:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>           
</dependency>

  将 AMQP starter 添加到构建中将触发自动配置,该配置将创建 AMQP 连接工厂和 RabbitTemplate bean,以及其他支持组件。只需添加此依赖项,就可以开始使用 Spring 从 RabbitMQ broker 发送和接收消息。

  RabbitMQ 常用属性:

属性 描述
spring.rabbitmq.addresses 一个逗号分隔的 RabbitMQ Broker 地址列表
spring.rabbitmq.host Broker 主机(默认为localhost)
spring.rabbitmq.port Broker 端口(默认为5672)
spring.rabbitmq.username 访问 Broker 的用户名(可选)
spring.rabbitmq.password 访问 Broker 的密码(可选)

  在 application.yml 中配置为:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 配置默认交换器和路由key
    template:
      exchange: tacocloud.orders
      routing-key: kitchens.central     


(4)RabbitTemplate

  Spring 对 RabbitMQ 的支持是提供了 RabbitTemplate,类似于 JmsTemplate,工作方式略有差异。

  关于使用 RabbitTemplate 发送消息,send() 和 convertAndSend() 方法与来自 JmsTemplate 的同名方法并行。但是不同于 JmsTemplate 方法,它只将消息路由到给定的队列或主题,RabbitTemplate 方法根据交换和路由键发送消息。下面是一些用 RabbitTemplate 发送消息的最有用的方法:

// 发送原始消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 发送从对象转换过来的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// 发送经过处理后从对象转换过来的消息
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

  这些方法与 JmsTemplate 中的孪生方法遵循类似的模式。前三个 send() 方法都发送一个原始消息对象。接下来的三个 convertAndSend() 方法接受一个对象,该对象将在发送之前在后台转换为消息。最后三个 convertAndSend() 方法与前三个方法类似,但是它们接受一个 MessagePostProcessor,可以在消息对象发送到代理之前使用它来操作消息对象。

  这些方法与对应的 JmsTemplate 方法不同,它们接受 String 值来指定交换和路由键,而不是目的地名称(或 Destination 对象)。不接受交换的方法将把它们的消息发送到默认交换。同样,不接受路由键的方法将使用默认路由键路由其消息。


  使用 RabbitTemplate 发送消息与使用 JmsTemplate 发送消息差别不大。事实证明,从 RabbitMQ 队列接收消息与从 JMS 接收消息并没有太大的不同。

与 JMS 一样,有两个选择:

  • 使用 RabbitTemplate 从队列中拉取消息
  • 获取被推送到 @RabbitListener 注解的方法中的消息

RabbitTemplate 有多个从队列中拉取消息的方法,一部分最有用的方法如下所示RabbitTemplate 有多个从队列中拉取消息的方法,一部分最有用的方法如下所示:

// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;

// 接收从消息转换过来的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

// 接收从消息转换过来的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;

  这些方法是前面描述的 send() 和 convertAndSend() 方法的镜像。send() 用于发送原始 Message 对象,而 receive() 从队列接收原始 Message 对象。同样地,receiveAndConvert() 接收消息,并在返回消息之前使用消息转换器将其转换为域对象。

  但是在方法签名方面有一些明显的区别。首先,这些方法都不以交换键或路由键作为参数。这是因为交换和路由键用于将消息路由到队列,但是一旦消息在队列中,它们的下一个目的地就是将消息从队列中取出的使用者。使用应用程序不需要关心交换或路由键,队列是在消费应用程序是仅仅需要知道一个东西。

  许多方法接受一个 long 参数来表示接收消息的超时。默认情况下,接收超时为 0 毫秒。也就是说,对 receive() 的调用将立即返回,如果没有可用的消息,则可能返回空值。这与 receive() 方法在 JmsTemplate 中的行为有明显的不同。通过传入超时值,可以让 receive() 和 receiveAndConvert() 方法阻塞,直到消息到达或超时过期。但是,即使使用非零超时,代码也要准备好处理返回的 null 值。

Ⅰ、发送消息

  定义一个消息接口:

package tacos.messaging;

import tacos.Order;

public interface OrderMessagingService {
    void sendOrder(Order order);
}

  定义一个实现接口的消息服务:

package tacos.messaging;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import tacos.Order;

@Service
public class RabbitOrderMessagingService implements OrderMessagingService {

    private RabbitTemplate rabbit;
    
    @Autowired
    public RabbitOrderMessagingService(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
    }
    
    @Override
    public void sendOrder(Order order) {
        // 1、显式使用消息转换器
        /*MessageConverter converter = rabbit.getMessageConverter();
        MessageProperties props = new MessageProperties();
        props.setHeader("X_ORDER_SOURCE", "WEB");
        Message message = converter.toMessage(order, props);
        rabbit.send("tacocloud.order", message);*/
        
        // 2、使用convertAndSend屏蔽消息转换细节
        //rabbit.convertAndSend("tacocloud.order", order);      
        
        // 3、使用convertAndSend时要设置消息属性只能通过MessagePostProcessor来处理消息细节
        rabbit.convertAndSend("tacocloud.order", order, new MessagePostProcessor() {        
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties props = message.getMessageProperties();
                props.setHeader("X_ORDER_SOURCE", "WEB");
                return message;
            }
        });
    }
}

  定义使用服务的控制器:

package tacos.messaging;

import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@RestController
@RequestMapping("/rabbit")
@Slf4j
public class RabbitController {

    @Autowired
    RabbitOrderMessagingService rabbitService;
    
    @GetMapping("/convertAndSend/order")
    public String convertAndSendOrder() {
        log.info("----------------------- SEND ORDER -------------------------");
        Order order = buildOrder();
        log.info("Ready to send order: " + order);
        rabbitService.sendOrder(order);
        log.info("Send order success!");
        return "Convert and send order";
    }
    
    private Order buildOrder() {
        Order order = new Order();
        order.setId(1L);
        order.setUserId(1L);
        order.setDeliveryName("huyihao");
        order.setDeliveryState("广东省");          
        order.setDeliveryCity("广州市");
        order.setDeliveryStreet("荔湾区石围塘街道");
        order.setDeliveryZip("515100");
        order.setCcNumber("6226890243871067");
        order.setCcExpiration("12/23");
        order.setCcCvv("282");
        order.setPlacedAt(new Date());
        
        return order;
    }   
}

  使用 postman 对 /rabbit/convertAndSend/order 发起请求,因为未指定 exchange,所以默认是 yml 中 spring.rabbit.template.exchange 属性配置的 tacocloud.orders

  当前不存在这个交换器,所以先创建该交换器,类型为 topic

  交换器要有绑定规则绑定到相应的队列,先创建队列 order、tacocloud

  在交换器上设定绑定规则,当 Routing key 与 *.order 匹配时消息会发送到 order 队列,与 tacocloud.* 匹配时消息会被发送到 tacocloud 队列。

  在程序里设定的 Routing key 是 tacocloud.order ,所以发送一条消息会同时被发送到上述两个队列中。

  发起测试,从 Rabbit 网页控制台上可以看到

  两个队列都收到了消息




Ⅱ、接收消息

  定义一个接收器:

@Component
public class RabbitOrderReceiver {

    private RabbitTemplate rabbit;
    private MessageConverter converter;
    
    @Autowired
    public RabbitOrderReceiver(RabbitTemplate rabbit, MessageConverter converter) {     
        this.rabbit = rabbit;
        this.converter = converter;
    }
    
    public Order receiveOrder() {
        Message message = rabbit.receive("order");
        Order order = null;
        if (message != null) {
            order = (Order) converter.fromMessage(message);
        } 
        return order;
    }
    
}

  控制器中新增一个处理方法:

@GetMapping("/receiveAndConvert/order")
public Order receiveAndConvertOrder() {
    log.info("----------------------- RECEIVE ORDER -------------------------");
    Order order = rabbitReceiver.receiveOrder();        
    log.info("Receive order success, order = " + order);
    return order;
}

  使用 postman 对 /receiveAndConvert/order 发起请求,可以看到接收的消息里有设置的属性

  查看 Rabbit Management,Order 队列中的消息数变成0,而 tacocloud 队列中的消息数依然为1。

  接收队列消息时队列中可能没有消息,有时可能想在没消息时再等一下,receive()receiveAndConvert() 都提供了带超时时间参数的方法,修改代码为如下:

public Order receiveOrder() {
    //Message message = rabbit.receive("order");
    
    // 带超时时间的消息接收
    /*Message message = rabbit.receive("order", 30000);
    Order order = null;
    if (message != null) {
        order = (Order) converter.fromMessage(message);
    }*/
    
    // receiveAndConvert 的另一种写法
    //Order order = (Order) rabbit.receiveAndConvert("order", 30000, new ParameterizedTypeReference<Order>() {});
    
    log.info("Start to receive order!");
    Order order = (Order) rabbit.receiveAndConvert("order", 30000);
    log.info("Receive order end, succ? " + (order != null));
    return order;
}

  发起接收消息的测试,因为 order 队列中没有消息,所以在 30s 超时时间内会被阻塞,此时在发起发送消息的请求,order 队列里就会新增一条消息,马上被阻塞的消息消费方所消费。

  tacocloud 队列因为又一次收到消息并且没被消费,所以消息数量会变为 2,而 order 队列依然为 0。

  有时,我们需要的是有个后台程序一直在监听队列,只要队列里有消息就消费,这种场景可以使用监听器的功能,Spring 提供了 RabbitListener,相当于 RabbitMQ 中的 JmsListener。要指定当消息到达 RabbitMQ 队列时应该调用某个方法,请在相应的 bean 方法上使用 @RabbitTemplate 进行注解 。

  启动程序,监听器会接收 tacocloud 队列里的 2 条消息。




3、使用 Kafka

(1)kafka 的介绍和环境安装

  高性能分布式消息系统 —— Kafka


(2)启动环境

  查看对应端口进程是否已启动


(3)配置 Spring Kafka

  Spring 没有针对 kafka 的 Spring Boot Starter,添加以下依赖即可:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>           
</dependency>

  这个依赖项将 Kafka 所需的一切都带到项目中。更重要的是,它的存在将触发 Kafka 的 Spring Boot 自动配置,它将在 Spring 应用程序上下文中生成一个 KafkaTemplate。你所需要做的就是注入 KafkaTemplate 并开始发送和接收消息。

  在开始发送和接收消息之前,应该了解一些在使用 Kafka 时会派上用场的属性。具体来说就是,KafkaTemplate 默认在 localhost 上运行 Kafka broker,并监听 9092 端口。在开发应用程序时,在本地启动 Kafka broker 是可以的,但是在进入生产环境时,需要配置不同的主机和端口。

  在 yml 中配置如下:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: tacocloud.orders.topic    

  spring.kafka.bootstrap-servers 属性是复数形式,它接受一个列表。因此,可以在集群中为它提供多个 Kafka 服务器。

(4)KafkaTemplate

  在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时。

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

  注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。

  再者 send() 和 sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:

  • 发送消息的 topic(send() 方法必要的参数)
  • 写入 topic 的分区(可选)
  • 发送记录的键(可选)
  • 时间戳(可选;默认为 System.currentTimeMillis())
  • payload(必须)

  topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send() 和 sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。

  对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。

Ⅰ、发送消息

  定义一个服务:

package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@Service
@Slf4j
public class KafkaOrderMessagingService implements OrderMessagingService {

    private KafkaTemplate<String, Order> kafkaTemplate;
        
    @Autowired
    public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {     
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        log.info("Ready to send message!");
        kafkaTemplate.send("tacocloud.orders.topic", order);
        // 不指定主题使用yml配置的默认主题
        //kafkaTemplate.sendDefault(order);
        log.info("Ready to send message!");
    }

}

  定义一个控制器:

package tacos.messaging;

import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {

    @Autowired
    private KafkaOrderMessagingService kafkaService;
    
    @GetMapping("/convertAndSend/order")
    public String convertAndSendOrder() {
        log.info("----------------------- SEND ORDER -------------------------");
        Order order = buildOrder();
        log.info("Ready to send order: " + order);
        kafkaService.sendOrder(order);
        log.info("Send order success!");
        return "Convert and send order";
    }
    
    private Order buildOrder() {
        Order order = new Order();
        order.setId(1L);
        order.setUserId(1L);
        order.setDeliveryName("huyihao");
        order.setDeliveryState("广东省");          
        order.setDeliveryCity("广州市");
        order.setDeliveryStreet("荔湾区石围塘街道");
        order.setDeliveryZip("515100");
        order.setCcNumber("6226890243871067");
        order.setCcExpiration("12/23");
        order.setCcCvv("282");
        order.setPlacedAt(new Date());
        
        return order;
    }
}

  使用 postman 对 /kafka/convertAndSend/order 发起测试请求,会发生以下报错:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class tacos.Order to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

  因为 Kafka 默认的键值序列化器都是 StringSerializer,对于自定义的领域对象,需要自定义序列化器:

package tacos.messaging;

import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.databind.ObjectMapper;

import tacos.Order;

public class OrderSerializer implements  Serializer<Order> {

    @Override
    public byte[] serialize(String topic, Order data) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(data).getBytes();
            System.out.println("Using OrderSerializer!!!");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return retVal;
    }

}

  并且要在配置文件中指定:

spring:
    producer:      
      acks: 1
      value-serializer: tacos.messaging.OrderSerializer

  再次发起请求,可以看到 Kafka 自动创建了队列,默认只有一个分区,偏移量是1,证明只有一条数据,即刚才测试发送的消息。


Ⅱ、接收消息

  除了 send() 和 sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。

  对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,其使用方式大致相同。

  编写一个 Kafka 监听器:

package tacos.messaging;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;
import tacos.Order;

@Component
@Slf4j
public class OrderListener {

    @KafkaListener(topics = {"tacocloud.orders.topic"})
    public void receiveOrder(Order order) {
        log.info("----------------------- LISTENER RECEIVE ORDER -------------------------");
        log.info("Reveive a order: " + order);
    }
    
}

  接收消息同样要经过反序列化的过程,需要定义一个反序列化器:

package tacos.messaging;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

import org.apache.kafka.common.serialization.Deserializer;

import com.alibaba.fastjson.JSON;

import tacos.Order;

public class OrderDeserializer implements Deserializer<Order> {

    @Override
    public Order deserialize(String topic, byte[] data) {
        if (Objects.isNull(data)) {
            return null;
        }

        String orderStr = new String(data, StandardCharsets.UTF_8);
        return JSON.parseObject(orderStr, Order.class);
    }

}

  同样接收消息需要做一些消费者配置:

spring:
    consumer:
      bootstrap-servers:
      - localhost:9092
      auto-offset-reset: latest
      group-id: kafkaListener
      value-deserializer: tacos.messaging.OrderDeserializer

  autoOffsetReset 参数配置的值为 latest,表示不消费 topic 里已经存在的消息,因此程序启动后,得先发消息,监听器才会消费消息。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容