基于Netty简单手写消息中间件思路

上一篇 <<<消息中间件常见问题汇总
下一篇 >>>消息队列常用名词与中间件对比


1.设计思路

1.生产者和netty服务器之间建立长连接
2.消费者和netty服务器建立长连接,首次启动时主动拉取netty队列中的消息
3.生产者发送消息
4.消息从netty服务器主动推送给消费者

2.核心代码

2.1 包依赖

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.54</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.42.Final</version>
    </dependency>
    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling</artifactId>
        <version>1.4.10.Final</version>
    </dependency>
    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>1.4.10.Final</version>
    </dependency>
</dependencies>

2.2 消息实体类

/**
 * 消息实体类定义【必须实现序列化】
 */
@Data
public class MqEntity implements Serializable {

    /**
     * 队列名称
     */
    public String queueName;

    /**
     * 消息内容
     */
    public String mqMsg;

    /**
     * 是否生产者
     */
    public boolean isProducer;
}

2.3 服务端代码

public class MqNettyServer {

    public static void start(){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //编码解码
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        socketChannel.pipeline().addLast(new ServerHandler());
                    }
                });
        try {
            ChannelFuture future = serverBootstrap.bind(Constants.PORT).sync();
            System.out.println("服务器启动成功:" + Constants.PORT);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        start();
    }

}

/**
 * server消息处理器
 */
public class ServerHandler extends SimpleChannelInboundHandler<MqEntity> {

    /**
     * 客户端列表
     */
    private static Map<String,List<ChannelHandlerContext>> ctxs = new HashMap<String, List<ChannelHandlerContext>>();
    /**
     * 消息列表
     */
    private static Map<String, Queue> queueMap = new HashMap<String, Queue>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MqEntity mqEntity) throws Exception {
        String queueName = mqEntity.getQueueName();
        if(queueName==null||"".equals(queueName)){
            return;
        }
        boolean isProducer = mqEntity.isProducer;
        if(isProducer){
            producer(mqEntity);
            return;
        }
        consumer(ctx,mqEntity);
    }

    /**
     * 发送给消费者具体操作
     * 1、保存客户端队列信息
     * 2、如果队列中有值,则主动拉取
     */
    private void consumer(ChannelHandlerContext ctx, MqEntity mqEntity) {
        String queueName = mqEntity.getQueueName();
        Queue queue = queueMap.get(queueName);
        List<ChannelHandlerContext> channelHandlerContexts = ctxs.get(queueName);
        if(channelHandlerContexts==null){
            channelHandlerContexts = new ArrayList<ChannelHandlerContext>();
        }
        channelHandlerContexts.add(ctx);
        ctxs.put(queueName,channelHandlerContexts);
        if(queue==null||queue.isEmpty()){
            System.out.println("当前队列没有数据,直接返回");
        }
        ctx.writeAndFlush(queue.poll());
    }

    /**
     * 生产者具体操作
     * 1、如果队列不存在的话创建队列并加入数据
     * 2、设置队列map
     * 3、如果已经存在了客户端列表,则主动推送
     */
    private void producer(MqEntity mqEntity) {
        String queueName = mqEntity.getQueueName();
        Queue queue = queueMap.get(queueName);
        if(queue==null){
            queue = new LinkedList();
        }
        queue.offer(mqEntity);
        queueMap.put(queueName,queue);
        List<ChannelHandlerContext> channelHandlerContexts = ctxs.get(queueName);
        if(channelHandlerContexts!=null&&channelHandlerContexts.size()>0){
            for(ChannelHandlerContext ctx:channelHandlerContexts){
                ctx.writeAndFlush(queue.poll());
            }
        }
    }
}

2.4 生产者

/**
 *  生产者
 */
public class MqProducer {

    public static void start(MqEntity request){
        //创建nioEventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(Constants.HOST, Constants.PORT))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //编码解码
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    }
                });
        try {
            // 发起同步连接
            ChannelFuture sync = bootstrap.connect().sync();
            System.out.println("生产者消息发送:"+ JSON.toJSONString(request));
            sync.channel().writeAndFlush(request);
            sync.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        MqEntity request = new MqEntity();
        request.setMqMsg("我的测试消息"+System.currentTimeMillis());
        request.setProducer(true);
        request.setQueueName(Constants.QUEUE_NAME);
        start(request);
    }
}

2.5 消费者

/**
 *  消费者
 */
public class MqConsumer {


    public static void start(MqEntity request){
        //创建nioEventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(Constants.HOST, Constants.PORT))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //编码解码
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        ch.pipeline().addLast(new ConsumerHandler());
                    }
                });
        try {
            // 发起同步连接
            ChannelFuture sync = bootstrap.connect().sync();
            System.out.println("消费者接受消息开始:"+ JSON.toJSONString(request));
            sync.channel().writeAndFlush(request);
            sync.channel().closeFuture().sync();
        } catch (Exception e) {

        }
    }

    public static void main(String[] args) {
        MqEntity request = new MqEntity();
        request.setProducer(false);
        request.setQueueName(Constants.QUEUE_NAME);
        start(request);
    }
}

public class ConsumerHandler extends SimpleChannelInboundHandler<MqEntity> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqEntity mqEntity) throws Exception {
        System.out.println("消费者获取消息:"+mqEntity.getMqMsg());
    }
}

推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之点对点简单队列
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容