一、RocketMQ的优势?
- 底层是Java实现的,于阅读源码、了解实现有利(RabbitMQ 底层是 Erlang,kafka 底层是 Scala)
- 能够保证严格的消息顺序
- 提供了丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级的消息堆积能力
二、整体流程
官方给出的 RocketMQ 架构图
- 启动 Namesrv,Namesrv起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心
- Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包(心跳包中,包含当前Broker信息-IP和端口等,以及存储所有Topic信息,注册成功后,Namesrv集群中就有Topic跟Broker 的映射关系)
- 收发消息前,先创建Topic。创建Topic时,需要指定该Topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
- Producer 发送消息。(启东时,先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。)
- Consumer消费消息。(Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。)
三、Docker搭建RocketMQ
rocketmq的docker镜像可以自己制作,官方文档中有详细介绍:rocketmq-docker
我找到了全网最快捷的搭建方式,使用foxiswho的镜像:foxiswho/rocketmq
在自己新建的rocketmq目录下打开终端,执行以下命令:
| git clone https://github.com/foxiswho/docker-rocketmq.git
| cd docker-rocketmq
| cd rmq
| chmod +x start.sh
| ./start.sh
控制台会输出rocketmq三台容器的状态
RocketMQ Docker 容器状态
此时我们通过浏览器访问localhost:8180查看到以下页面则说明安装成功。
RocketMQ 控制台
四、SpringBoot 整合 RocketMQ Demo
1. 创建一个SpringBoot项目,
使用IDEA-File-New Project-Spring Initializr,可以很轻松的创建出一个简单的Web工程。
2. 引入RocketMQ依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
3. 配置 application.yml
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: demo-producer-group # 生产者分组
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
test-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
4. 创建一个生产者类
生产者发送消息
@RestController
public class RocketController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 延时消息,RocketMQ支持这几个级别的延时消息,自定义需要修改broker配置文件
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@GetMapping("/rocket/delayMsg/send")
public String rocketDelayMsgSend() {
LocalDateTime currentDateTime = LocalDateTime.now();
rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentDateTime.toString()).build(), 2000, 3);
return currentDateTime.toString();
}
}
5. 创建一个消费者
消费者监听消息
@Slf4j
@Component
public class RokcetServiceListener {
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-2")
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer1 rocket收到消息:{}", s);
}
}
//MessageModel.BROADCASTING 广播消息模式
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-2", selectorExpression = "tag-2", messageModel = MessageModel.BROADCASTING)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer2 rocket收到消息:{}", s);
}
}
}
6.测试
我们在浏览器中访问localhost:8080/rocket/send,即可看到返回的时间戳
浏览器返回
同时在控制台可以看到两个消费者都获取到了这条消息(延时10s)
Consumer1和Consumer2都获取到了消息
在rocketMq-console也可以看到这条消息
rocketMq-console控制台
网络问题
- org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed
本地调试项目时,不能直接访问 docker rocketmq 容器,因此我们需要将修改broker.conf配置,将/rmq/rmq/brokerconf目录下的broker.conf中的#brokerIP1=xxxxx注释去掉,并将IP地址改成局域网IP。