MAC上 Docker 安装部署 RocketMQ 整合 SpringBoot 实践

一、RocketMQ的优势?

  1. 底层是Java实现的,于阅读源码、了解实现有利(RabbitMQ 底层是 Erlang,kafka 底层是 Scala)
  2. 能够保证严格的消息顺序
  3. 提供了丰富的消息拉取模式
  4. 高效的订阅者水平扩展能力
  5. 实时的消息订阅机制
  6. 亿级的消息堆积能力

二、整体流程

官方给出的 RocketMQ 架构图
  1. 启动 Namesrv,Namesrv起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心
  2. Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包(心跳包中,包含当前Broker信息-IP和端口等,以及存储所有Topic信息,注册成功后,Namesrv集群中就有Topic跟Broker 的映射关系)
  3. 收发消息前,先创建Topic。创建Topic时,需要指定该Topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
  4. Producer 发送消息。(启东时,先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。)
  5. Consumer消费消息。(Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。)

三、Docker搭建RocketMQ

rocketmq的docker镜像可以自己制作,官方文档中有详细介绍:rocketmq-docker

我找到了全网最快捷的搭建方式,使用foxiswho的镜像:foxiswho/rocketmq

在自己新建的rocketmq目录下打开终端,执行以下命令:

| git clone https://github.com/foxiswho/docker-rocketmq.git   
| cd docker-rocketmq 
| cd rmq
| chmod +x start.sh
| ./start.sh

控制台会输出rocketmq三台容器的状态


RocketMQ Docker 容器状态

此时我们通过浏览器访问localhost:8180查看到以下页面则说明安装成功。


RocketMQ 控制台

四、SpringBoot 整合 RocketMQ Demo

1. 创建一个SpringBoot项目,

使用IDEA-File-New Project-Spring Initializr,可以很轻松的创建出一个简单的Web工程。

2. 引入RocketMQ依赖

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

3. 配置 application.yml

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: demo-producer-group # 生产者分组
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      test-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

4. 创建一个生产者类

生产者发送消息

@RestController
public class RocketController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 延时消息,RocketMQ支持这几个级别的延时消息,自定义需要修改broker配置文件
    // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    @GetMapping("/rocket/delayMsg/send")
    public String rocketDelayMsgSend() {
        LocalDateTime currentDateTime = LocalDateTime.now();
        rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentDateTime.toString()).build(), 2000, 3);
        return currentDateTime.toString();
    }
}

5. 创建一个消费者

消费者监听消息

@Slf4j
@Component
public class RokcetServiceListener {

    @Service
    @RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-2")
    public class Consumer1 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            log.info("consumer1 rocket收到消息:{}", s);
        }
    }
  
    //MessageModel.BROADCASTING 广播消息模式
    @Service
    @RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-2", selectorExpression = "tag-2", messageModel = MessageModel.BROADCASTING)
    public class Consumer2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            log.info("consumer2 rocket收到消息:{}", s);
        }
    }
}

6.测试

我们在浏览器中访问localhost:8080/rocket/send,即可看到返回的时间戳

浏览器返回

同时在控制台可以看到两个消费者都获取到了这条消息(延时10s)

Consumer1和Consumer2都获取到了消息

在rocketMq-console也可以看到这条消息

rocketMq-console控制台

网络问题

  • org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed

本地调试项目时,不能直接访问 docker rocketmq 容器,因此我们需要将修改broker.conf配置,将/rmq/rmq/brokerconf目录下的broker.conf中的#brokerIP1=xxxxx注释去掉,并将IP地址改成局域网IP。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容