Rabbitmq示例之点对点简单队列

上一篇 <<<Rabbitmq基础知识
下一篇 >>>Rabbitmq示例之工作(公平)队列


1.特点

默认的传统队列是为均摊消费,存在不公平性;
如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

2.程序代码示例

2.1依赖包

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5 </version>
    </dependency>
</dependencies>

2.2 连接类

public class RabitMQConnection {

    public static Connection getConnection() throws IOException, TimeoutException {
        // 1.创建我们的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置我们的连接地址
        connectionFactory.setHost("10.211.55.16");
        // 3.设置我们的端口号
        connectionFactory.setPort(5672);
        // 4.设置账号和密码
        connectionFactory.setUsername("jiang");
        connectionFactory.setPassword("123456");
        // 5.设置VirtualHost
        connectionFactory.setVirtualHost("/mytest1205");
        return connectionFactory.newConnection();
    }
}

2.3 生产者

public class Producer {
    private static final String QUEUE_NAME = "test1205";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("生产者启动成功..");
        // 1.创建我们的连接
        Connection connection = RabitMQConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        for (int i = 0; i < 10; i++) {
            String msg = "发送测试内容-" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("生产者发送消息成功:" + msg);
        }
        channel.close();
        connection.close();
    }
}

2.4 消费者

public class Consumer {
    private static final String QUEUE_NAME = "test1205";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建我们的连接
        Connection connection = RabitMQConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费消息msg:" + msg);
            }
        };
        // 3.创建我们的监听的消息
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
   
    }
}

3.测试效果

消费者开多个时,默认是采用轮询(均摊)机制


推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容