很多时候,我们不需要实时地处理消息,可以把事情放一放再处理,比如工作比较忙碌无法实时处理的,可以通过邮件通知,邮件接收方有空时再去看看邮箱里有什么事情要处理,这解耦了消息发送方和接收方的通信,使得忙碌一方可以从容不迫处理事情。
在系统架构上,消息中间件提供了类似的功能,常用的 MQ 有使用 JMS 的 ActiveMQ、使用 AMQP 协议的 RabbitMQ、吞吐能力惊人的开源的 Kafka、阿里开源 RocketMQ。
1、使用 JMS
(1)搭建 Active Artemis 环境
Ⅰ、安装
安装包下载(找到与自己使用的客户端匹配的版本):ActiveMQ (apache.org)
将 apache-artemis-xxx-bin.zip 解压到安装目录,设置环境变量 ARTEMIS_HOME
,值为解压目录。
添加执行目录到 PATH 环境参数中
Ⅱ、创建 broker 实例
使用以下命令创建 broker(并设定用户密码):
artemis create brokername
在当前命令行目录下会创建一个对应的目录
Ⅲ、启停 broker
切换到生成的 broker 目录下的 bin 子目录,执行以下命令即可启动该 broker:
artemis run
启动后访问控制台
http://localhost:8161/console
使用一开始创建 broker 时设定的账号密码登录,登入后的控制台菜单和页面如下
停止 broker
artemis stop
Ⅳ、发送接收消息
在${ARTEMIS_HOME}\examples
目录下,自带了非常多的使用示例,可以选取其中的示例来验证发送消息、接收消息功能。
比如,选取了“examples\features\standard\queue”这个例子,在Broker实例启动的情况下,执行以下命令来运行示例
mvn -PnoServer verify
或者在没有Broker实例启动的情况下,执行以下命令来运行示例
mvn verify
运行完成之后,可以看到控制台输出内容如下:
可以看到示例程序已经能够成功发送和接收到消息“This is a text message”了。
(2)JmsTemplate
首先要引入 JMS Starter 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
引入该依赖后,SpringBoot 会在启动时自动配置一个 JmsTemplate,可以将它注入到程序中,并用它来发送和接收消息。
JmsTemplate 是 Spring JMS 集成支持的核心。与 Spring 的其他面向模板的组件非常相似,JmsTemplate 消除了大量与 JMS 协同工作所需的样板代码。如果没有 JmsTemplate,将需要编写代码来创建与消息代理的连接和会话,并编写更多代码来处理在发送消息过程中可能抛出的任何异常。JmsTemplate 专注于真正想做的事情:发送消息。
Ⅰ、发送消息
JmsTemplate 有几个发送消息的有用方法,包括:JmsTemplate 有几个发送消息的有用方法,包括:
// 发送原始消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
// 发送转换自对象的消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
// 发送经过处理后从对象转换而来的消息
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
实际上只有两个方法,send() 和 convertAndSend(),每个方法都被重载以支持不同的参数。如果仔细观察,会发现 convertAndSend() 的各种形式可以分为两个子类。在试图理解所有这些方法的作用时,请考虑以下细分:
- send() 方法需要一个 MessageCreator 来制造一个 Message 对象。
- convertAndSend() 方法接受一个 Object,并在后台自动将该 Object 转换为一条 Message。
- 三种 convertAndSend() 方法会自动将一个 Object 转换成一条 Message,但也会接受一个 MessagePostProcessor,以便在 Message 发送前对其进行定制。
此外,这三个方法类别中的每一个都由三个重载的方法组成,它们是通过指定 JMS 目的地(队列或主题)的方式来区分的:
- 一个方法不接受目的地参数,并将消息发送到默认目的地。
- 一个方法接受指定消息目的地的目标对象。
- 一个方法接受一个 String,该 String 通过名称指定消息的目的地。
使用 send() 方法发送消息:
private JmsTemplate jms;
private void useSend(Order order) {
// 写法1:使用MessageCreator的匿名内部类写法
/*jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(order);
}
});*/
// 写法2:函数式接口匿名内部类的简化写法,lambda表达式,默认为 spring.jms.template.default-destination 参数设置的队列
jms.send(session -> session.createObjectMessage(order));
// 写法3:指定队列(使用Destination、直接使用字符串)
jms.send(orderQueue,
session -> session.createObjectMessage(order));
jms.send("tacocloud.order.queue",
session -> session.createObjectMessage(order));
jms.send("tacocloud.order.queue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});
jms.send("tacocloud.order.queue", session -> {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});
}
使用 useConvertAndSend() 方法发送消息:
private void useConvertAndSend(Order order) {
jms.convertAndSend(order);
jms.convertAndSend(orderQueue, order);
jms.convertAndSend("tacocloud.order.queue", order);
// 在发送数据之前修改底层的Message对象
jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});
// lambda
jms.convertAndSend("tacocloud.order.queue", order, message -> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});
// 方法引用
jms.convertAndSend("tacocloud.order.queue", order, this::addOrderSource);
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
编写 Jms 服务
package tacos.messaging;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
private Destination orderQueue;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms, Destination orderQueue) {
this.jms = jms;
this.orderQueue = orderQueue;
}
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order, this::addOrderSource);
}
}
编写使用 Jms 服务的控制器:
package tacos.messaging;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@RestController
@RequestMapping("/jms")
@Slf4j
public class JmsController {
@Autowired
JmsOrderMessagingService jmsService;
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
log.info("----------------------- SEND ORDER -------------------------");
Order order = buildOrder();
log.info("Ready to send order: " + order);
jmsService.sendOrder(order);
log.info("Send order success!");
return "Convert and send order";
}
private Order buildOrder() {
Order order = new Order();
order.setId(1L);
order.setUserId(1L);
order.setDeliveryName("huyihao");
order.setDeliveryState("广东省");
order.setDeliveryCity("广州市");
order.setDeliveryStreet("荔湾区石围塘街道");
order.setDeliveryZip("515100");
order.setCcNumber("6226890243871067");
order.setCcExpiration("12/23");
order.setCcCvv("282");
order.setPlacedAt(new Date());
return order;
}
}
发起请求测试:
Ⅱ、接收消息
在消息消费的时候,可以选择拉取模式(pull mode)和推送模式(push mode),前者会在我们的代码中请求消息并一直等待直到消息到达为止,而后者则会在消息可用的时候自动在代码中执行。
JmsTemplate 提供了几种接收消息的方法,但它们都使用拉模型。调用其中一个方法来请求消息,线程会发生阻塞,直到消息可用为止(可能是立即可用,也可能需要一段时间)。
另一方面,还可以选择使用推模型,在该模型中,定义了一个消息监听器,它在消息可用时被调用。
这两个选项都适用于各种用例。人们普遍认为推模型是最佳选择,因为它不会阻塞线程。但是在某些用例中,如果消息到达得太快,侦听器可能会负担过重。拉模型允许使用者声明他们已经准备好处理新消息。
JmsTemplate 提供多个用于拉模式的方法,包括以下这些:
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
可以看到,这 6 个方法是 JmsTemplate 中的 send() 和 convertAndSend() 方法的镜像。receive() 方法接收原始消息,而 receiveAndConvert() 方法使用配置的消息转换器将消息转换为域类型。对于其中的每一个,可以指定 Destination 或包含目的地名称的 String,也可以从缺省目的地获取一条消息。
定义一个订单接收接口:
package tacos.messaging;
import javax.jms.JMSException;
import org.springframework.jms.support.converter.MessageConversionException;
import tacos.Order;
public interface OrderReceiver {
public Order receiveOrder() throws MessageConversionException, JMSException ;
}
定义一个实现类并注册为Spring服务:
package tacos.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@Service
@Slf4j
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
private MessageConverter converter;
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
this.jms = jms;
this.converter = converter;
}
@Override
public Order receiveOrder() throws MessageConversionException, JMSException {
Message message = jms.receive("tacocloud.order.queue");
String source = message.getStringProperty("X_ORDER_SOURCE");
log.info("This order is from " + source);
return (Order) converter.fromMessage(message);
}
}
在控制器中新增响应请求方法:
@GetMapping("/receiveAndConvert/order")
public Order receiveAndConvertOrder() {
log.info("----------------------- RECEIVE ORDER -------------------------");
Order order = null;
try {
order = jmsReceiver.receiveOrder();
} catch (MessageConversionException e) {
log.error("Convert message exception", e);
e.printStackTrace();
} catch (JMSException e) {
log.error("Receive message exception", e);
e.printStackTrace();
}
log.info("Send order success!");
return order;
}
测试:
消息监听器是推送模式接收消息,在消息到达之前,它会一直处于空闲状态,消息到达时接收消息处理。
要定义一个监听器需要使用 @JmsListener
注解,下面定义一个 Spring 组件,并且定义一个使用了该注解的接收订单的方法:
package tacos.messaging;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@Component
@Slf4j
public class OrderListener {
@JmsListener(destination = "tacocloud.order.queue")
public void receiveOrder(Order order) {
log.info("----------------------- LISTENER RECEIVE ORDER -------------------------");
log.info("Reveive a order: " + order);
}
}
监听器接收到消息时会打印响应的日志。
发一笔对 /jms/convertAndSend/order
的请求,以触发消息发送,若监听器生效,会自动接收消息和打印日志。
可以看到监听器马上打印出了相应的日志,证明起效果了。
2、使用 RabbitMQ
(1)AMQP 协议
RabbitMQ 使用 AMQP 协议,AMQP协议规定了路由消息必须有三个组成部分:交换器、绑定、队列,如下图所示:
生产者将消息发布到交换器,交换器根据绑定规则,将消息分发到对应的队列,消息出队后被消费者消费。
交换器是一个虚拟的用来接收、投递消息的中间构件。生产者将消息发布到交换器上,并声明绑定的路由规则,然后交换器根据绑定路由规则,将消息投递到满足绑定路由规则的队列上。
举个例子,你写信给女朋友Alice,信件上写明要投递到某个地址上面去,然后将信件投递到邮箱,接着邮递员从邮箱中取邮件,根据你填写的地址将信件发到Alice的家中,这里邮箱、快递员、信件上的地址就组成了交换器的效果,邮箱负责接收信件、快递员负责发信件、信件地址是投递信件的规则。
当然,绑定规则跟信件地址并不是完全相等的关系,参考上面的“发后即忘”模型图。
RabbitMQ中有三种常用的交换器,分别是direct、fanout、topic,还有一种headers交换器,允许匹配AMQP消息的header而非路由键,但其性能差因此几乎不会用到。
direct交换器
direct交换器是直接路由键匹配,如图所示:
RabbitMQ默认实现了一个名字为空字符串的direct交换器,当声明一个队列时,默认会绑定到这个默认的交换器。
fanout 交换器
fanout交换器会将收到的消息广播到绑定的队列上,比如应用程序中A模块完成之后必须触发B、C、D模块,并且B、C、D之前是并发的没有程序耦合关系,则适合对B、C、D分别声明一条队列来实现与A模块的关联,通过MQ,A模块无需显式地去调用BCD,实现了程序的解耦,这也是MQ存在的意义之一。
topic 交换器
topic交换器是RabbitMQ中最灵活的交换器,由于队列支持对绑定关系进行模糊匹配,因此能够实现多种复杂的应用场景需求,下面以日志搜集为例,实现对不同日志级别维度的日志进行收集,如图所示:
(2)环境搭建
Ⅰ、安装Erlang
RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang,下载地址:https://www.erlang.org/downloads
双击 otp_win64_25.3.2.exe
直接安装即可。
设置环境变量
验证环境
Ⅱ、安装RabbitMQ
下载地址:https://www.rabbitmq.com/install-windows.html,根据使用操作系统下载对应的安装包,windows是对应 rabbitmq-server-3.11.15.exe
文件,双击该可执行文件,按照默认配置完成安装即可。
安装Web网页管理插件 RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及exchange的工作情况。
程序安装好之后默认情况下服务是开启的,这一点可以打开Windows的服务界面查看。
打开开始菜单,选择RabbitMQ Server->RabbitMQ Service-stop命令,停止服务。
此时再次打开Windows的服务管理界面,可以看见RabbitMQ服务已停止。
打开 CMD 窗口,切换到安装目录下的 sbin 目录下
输入命令:
rabbitmq-plugins enable rabbitmq_management
安装完插件,开启服务,打开菜单,RabbitMQ Service Start
成功开启服务之后,同样打开服务管理界面,查看是否已开启服务
打开浏览器,访问:
http://localhost:15672/
进入控制台登录页面,默认账户密码为:guest/guest
登录成功之后,如下图所示:
选择Queues页面,打开“Add a new queue”,为队列命名,如“MQ_Test”,其它选择可以默认,然后点击“Add queue”按钮,
添加成功之后,在“All queues”选项下面会列出刚才创建的队列信息,
在“All queues”列表中选择刚刚创建的队列“MQ_Test”,会显示“MQ_Test”的信息页面,主要显示了该队列的网络状态以及速率监控等,然后选择“Publish message”,在“Payload”中输入”Hello world!!!”,
然后点击“Publish message”按钮,就可以发送消息,发送完之后在“Overview”中显示了实时的网络状态。
然后选择“Get messages”下拉框,会弹出接收消息的显示界面,点击“Get message”按钮,接收到来自服务器的消息,也就是刚刚发送的“Hello world!!!”
到此,可以说明RabbitMQ的安装均已正常,进一步操作请参考官网的操作文档:http://www.rabbitmq.com/documentation.html
(3)配置 Spring Rabbit
引入对应的 Starter:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
将 AMQP starter 添加到构建中将触发自动配置,该配置将创建 AMQP 连接工厂和 RabbitTemplate bean,以及其他支持组件。只需添加此依赖项,就可以开始使用 Spring 从 RabbitMQ broker 发送和接收消息。
RabbitMQ 常用属性:
属性 | 描述 |
---|---|
spring.rabbitmq.addresses | 一个逗号分隔的 RabbitMQ Broker 地址列表 |
spring.rabbitmq.host | Broker 主机(默认为localhost) |
spring.rabbitmq.port | Broker 端口(默认为5672) |
spring.rabbitmq.username | 访问 Broker 的用户名(可选) |
spring.rabbitmq.password | 访问 Broker 的密码(可选) |
在 application.yml 中配置为:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 配置默认交换器和路由key
template:
exchange: tacocloud.orders
routing-key: kitchens.central
(4)RabbitTemplate
Spring 对 RabbitMQ 的支持是提供了 RabbitTemplate,类似于 JmsTemplate,工作方式略有差异。
关于使用 RabbitTemplate 发送消息,send() 和 convertAndSend() 方法与来自 JmsTemplate 的同名方法并行。但是不同于 JmsTemplate 方法,它只将消息路由到给定的队列或主题,RabbitTemplate 方法根据交换和路由键发送消息。下面是一些用 RabbitTemplate 发送消息的最有用的方法:
// 发送原始消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
// 发送从对象转换过来的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// 发送经过处理后从对象转换过来的消息
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
这些方法与 JmsTemplate 中的孪生方法遵循类似的模式。前三个 send() 方法都发送一个原始消息对象。接下来的三个 convertAndSend() 方法接受一个对象,该对象将在发送之前在后台转换为消息。最后三个 convertAndSend() 方法与前三个方法类似,但是它们接受一个 MessagePostProcessor,可以在消息对象发送到代理之前使用它来操作消息对象。
这些方法与对应的 JmsTemplate 方法不同,它们接受 String 值来指定交换和路由键,而不是目的地名称(或 Destination 对象)。不接受交换的方法将把它们的消息发送到默认交换。同样,不接受路由键的方法将使用默认路由键路由其消息。
使用 RabbitTemplate 发送消息与使用 JmsTemplate 发送消息差别不大。事实证明,从 RabbitMQ 队列接收消息与从 JMS 接收消息并没有太大的不同。
与 JMS 一样,有两个选择:
- 使用 RabbitTemplate 从队列中拉取消息
- 获取被推送到 @RabbitListener 注解的方法中的消息
RabbitTemplate 有多个从队列中拉取消息的方法,一部分最有用的方法如下所示RabbitTemplate 有多个从队列中拉取消息的方法,一部分最有用的方法如下所示:
// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收从消息转换过来的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// 接收从消息转换过来的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
这些方法是前面描述的 send() 和 convertAndSend() 方法的镜像。send() 用于发送原始 Message 对象,而 receive() 从队列接收原始 Message 对象。同样地,receiveAndConvert() 接收消息,并在返回消息之前使用消息转换器将其转换为域对象。
但是在方法签名方面有一些明显的区别。首先,这些方法都不以交换键或路由键作为参数。这是因为交换和路由键用于将消息路由到队列,但是一旦消息在队列中,它们的下一个目的地就是将消息从队列中取出的使用者。使用应用程序不需要关心交换或路由键,队列是在消费应用程序是仅仅需要知道一个东西。
许多方法接受一个 long 参数来表示接收消息的超时。默认情况下,接收超时为 0 毫秒。也就是说,对 receive() 的调用将立即返回,如果没有可用的消息,则可能返回空值。这与 receive() 方法在 JmsTemplate 中的行为有明显的不同。通过传入超时值,可以让 receive() 和 receiveAndConvert() 方法阻塞,直到消息到达或超时过期。但是,即使使用非零超时,代码也要准备好处理返回的 null 值。
Ⅰ、发送消息
定义一个消息接口:
package tacos.messaging;
import tacos.Order;
public interface OrderMessagingService {
void sendOrder(Order order);
}
定义一个实现接口的消息服务:
package tacos.messaging;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
@Override
public void sendOrder(Order order) {
// 1、显式使用消息转换器
/*MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);*/
// 2、使用convertAndSend屏蔽消息转换细节
//rabbit.convertAndSend("tacocloud.order", order);
// 3、使用convertAndSend时要设置消息属性只能通过MessagePostProcessor来处理消息细节
rabbit.convertAndSend("tacocloud.order", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
}
定义使用服务的控制器:
package tacos.messaging;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@RestController
@RequestMapping("/rabbit")
@Slf4j
public class RabbitController {
@Autowired
RabbitOrderMessagingService rabbitService;
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
log.info("----------------------- SEND ORDER -------------------------");
Order order = buildOrder();
log.info("Ready to send order: " + order);
rabbitService.sendOrder(order);
log.info("Send order success!");
return "Convert and send order";
}
private Order buildOrder() {
Order order = new Order();
order.setId(1L);
order.setUserId(1L);
order.setDeliveryName("huyihao");
order.setDeliveryState("广东省");
order.setDeliveryCity("广州市");
order.setDeliveryStreet("荔湾区石围塘街道");
order.setDeliveryZip("515100");
order.setCcNumber("6226890243871067");
order.setCcExpiration("12/23");
order.setCcCvv("282");
order.setPlacedAt(new Date());
return order;
}
}
使用 postman 对 /rabbit/convertAndSend/order
发起请求,因为未指定 exchange,所以默认是 yml 中 spring.rabbit.template.exchange
属性配置的 tacocloud.orders 。
当前不存在这个交换器,所以先创建该交换器,类型为 topic
交换器要有绑定规则绑定到相应的队列,先创建队列 order、tacocloud
在交换器上设定绑定规则,当 Routing key 与 *.order
匹配时消息会发送到 order 队列,与 tacocloud.*
匹配时消息会被发送到 tacocloud 队列。
在程序里设定的 Routing key 是 tacocloud.order
,所以发送一条消息会同时被发送到上述两个队列中。
发起测试,从 Rabbit 网页控制台上可以看到
两个队列都收到了消息
Ⅱ、接收消息
定义一个接收器:
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit, MessageConverter converter) {
this.rabbit = rabbit;
this.converter = converter;
}
public Order receiveOrder() {
Message message = rabbit.receive("order");
Order order = null;
if (message != null) {
order = (Order) converter.fromMessage(message);
}
return order;
}
}
控制器中新增一个处理方法:
@GetMapping("/receiveAndConvert/order")
public Order receiveAndConvertOrder() {
log.info("----------------------- RECEIVE ORDER -------------------------");
Order order = rabbitReceiver.receiveOrder();
log.info("Receive order success, order = " + order);
return order;
}
使用 postman 对 /receiveAndConvert/order
发起请求,可以看到接收的消息里有设置的属性
查看 Rabbit Management,Order 队列中的消息数变成0,而 tacocloud 队列中的消息数依然为1。
接收队列消息时队列中可能没有消息,有时可能想在没消息时再等一下,receive()
、receiveAndConvert()
都提供了带超时时间参数的方法,修改代码为如下:
public Order receiveOrder() {
//Message message = rabbit.receive("order");
// 带超时时间的消息接收
/*Message message = rabbit.receive("order", 30000);
Order order = null;
if (message != null) {
order = (Order) converter.fromMessage(message);
}*/
// receiveAndConvert 的另一种写法
//Order order = (Order) rabbit.receiveAndConvert("order", 30000, new ParameterizedTypeReference<Order>() {});
log.info("Start to receive order!");
Order order = (Order) rabbit.receiveAndConvert("order", 30000);
log.info("Receive order end, succ? " + (order != null));
return order;
}
发起接收消息的测试,因为 order 队列中没有消息,所以在 30s 超时时间内会被阻塞,此时在发起发送消息的请求,order 队列里就会新增一条消息,马上被阻塞的消息消费方所消费。
tacocloud 队列因为又一次收到消息并且没被消费,所以消息数量会变为 2,而 order 队列依然为 0。
有时,我们需要的是有个后台程序一直在监听队列,只要队列里有消息就消费,这种场景可以使用监听器的功能,Spring 提供了 RabbitListener,相当于 RabbitMQ 中的 JmsListener。要指定当消息到达 RabbitMQ 队列时应该调用某个方法,请在相应的 bean 方法上使用 @RabbitTemplate 进行注解 。
启动程序,监听器会接收 tacocloud 队列里的 2 条消息。
3、使用 Kafka
(1)kafka 的介绍和环境安装
(2)启动环境
查看对应端口进程是否已启动
(3)配置 Spring Kafka
Spring 没有针对 kafka 的 Spring Boot Starter,添加以下依赖即可:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这个依赖项将 Kafka 所需的一切都带到项目中。更重要的是,它的存在将触发 Kafka 的 Spring Boot 自动配置,它将在 Spring 应用程序上下文中生成一个 KafkaTemplate。你所需要做的就是注入 KafkaTemplate 并开始发送和接收消息。
在开始发送和接收消息之前,应该了解一些在使用 Kafka 时会派上用场的属性。具体来说就是,KafkaTemplate 默认在 localhost 上运行 Kafka broker,并监听 9092 端口。在开发应用程序时,在本地启动 Kafka broker 是可以的,但是在进入生产环境时,需要配置不同的主机和端口。
在 yml 中配置如下:
spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
default-topic: tacocloud.orders.topic
spring.kafka.bootstrap-servers 属性是复数形式,它接受一个列表。因此,可以在集群中为它提供多个 Kafka 服务器。
(4)KafkaTemplate
在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时。
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。
再者 send() 和 sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:
- 发送消息的 topic(send() 方法必要的参数)
- 写入 topic 的分区(可选)
- 发送记录的键(可选)
- 时间戳(可选;默认为 System.currentTimeMillis())
- payload(必须)
topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send() 和 sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。
对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。
Ⅰ、发送消息
定义一个服务:
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@Service
@Slf4j
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
log.info("Ready to send message!");
kafkaTemplate.send("tacocloud.orders.topic", order);
// 不指定主题使用yml配置的默认主题
//kafkaTemplate.sendDefault(order);
log.info("Ready to send message!");
}
}
定义一个控制器:
package tacos.messaging;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
@Autowired
private KafkaOrderMessagingService kafkaService;
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
log.info("----------------------- SEND ORDER -------------------------");
Order order = buildOrder();
log.info("Ready to send order: " + order);
kafkaService.sendOrder(order);
log.info("Send order success!");
return "Convert and send order";
}
private Order buildOrder() {
Order order = new Order();
order.setId(1L);
order.setUserId(1L);
order.setDeliveryName("huyihao");
order.setDeliveryState("广东省");
order.setDeliveryCity("广州市");
order.setDeliveryStreet("荔湾区石围塘街道");
order.setDeliveryZip("515100");
order.setCcNumber("6226890243871067");
order.setCcExpiration("12/23");
order.setCcCvv("282");
order.setPlacedAt(new Date());
return order;
}
}
使用 postman 对 /kafka/convertAndSend/order
发起测试请求,会发生以下报错:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class tacos.Order to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
因为 Kafka 默认的键值序列化器都是 StringSerializer,对于自定义的领域对象,需要自定义序列化器:
package tacos.messaging;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import tacos.Order;
public class OrderSerializer implements Serializer<Order> {
@Override
public byte[] serialize(String topic, Order data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
System.out.println("Using OrderSerializer!!!");
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
}
并且要在配置文件中指定:
spring:
producer:
acks: 1
value-serializer: tacos.messaging.OrderSerializer
再次发起请求,可以看到 Kafka 自动创建了队列,默认只有一个分区,偏移量是1,证明只有一条数据,即刚才测试发送的消息。
Ⅱ、接收消息
除了 send() 和 sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。
对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,其使用方式大致相同。
编写一个 Kafka 监听器:
package tacos.messaging;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import tacos.Order;
@Component
@Slf4j
public class OrderListener {
@KafkaListener(topics = {"tacocloud.orders.topic"})
public void receiveOrder(Order order) {
log.info("----------------------- LISTENER RECEIVE ORDER -------------------------");
log.info("Reveive a order: " + order);
}
}
接收消息同样要经过反序列化的过程,需要定义一个反序列化器:
package tacos.messaging;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.kafka.common.serialization.Deserializer;
import com.alibaba.fastjson.JSON;
import tacos.Order;
public class OrderDeserializer implements Deserializer<Order> {
@Override
public Order deserialize(String topic, byte[] data) {
if (Objects.isNull(data)) {
return null;
}
String orderStr = new String(data, StandardCharsets.UTF_8);
return JSON.parseObject(orderStr, Order.class);
}
}
同样接收消息需要做一些消费者配置:
spring:
consumer:
bootstrap-servers:
- localhost:9092
auto-offset-reset: latest
group-id: kafkaListener
value-deserializer: tacos.messaging.OrderDeserializer
autoOffsetReset
参数配置的值为 latest,表示不消费 topic 里已经存在的消息,因此程序启动后,得先发消息,监听器才会消费消息。