Java-JMS消息队列(ActiveMQ)

<blockquote><h4>认识消息队列</h4></blockquote>

  “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含对象……
  消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

<blockquote><h4>JMS消息队列</h4></blockquote>

  Jms即[Java消息服务](http://baike.baidu.com/view/3292569.htm)(Java Message Service)[应用程序](http://baike.baidu.com/view/330120.htm)接口是一个[Java平台](http://baike.baidu.com/view/209634.htm)中关于面向[消息中间件](http://baike.baidu.com/view/3118541.htm)(MOM)的API,用于在两个应用程序之间,或[分布式系统](http://baike.baidu.com/view/991489.htm)中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供[商都](http://baike.baidu.com/view/19763.htm)对JMS提供支持。
  JMS(Java Messaging Service)是[Java](http://baike.baidu.com/view/29.htm)平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java[应用程序](http://baike.baidu.com/view/330120.htm)进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化[企业](http://baike.baidu.com/view/38340.htm)应用的开发,翻译为[Java](http://baike.baidu.com/view/29.htm)消息[服务](http://baike.baidu.com/view/133203.htm)。

<blockquote><h4>JMS对象模型</h4></blockquote>

1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。   
3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。   
4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。
5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
6)JMS消息通常有两种类型:   ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。   ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
<blockquote><h4>ActiveMQ代码实现</h4></blockquote>

1)下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2)解压运行AciveMQ
解压apache-activemq-5.13.3-bin.zip文件,运行apache-activemq-5.13.3\bin\win64\activemq.bat,启动ActiveMQ,登录http://localhost:8161/admin/,创建一个Queues,命名为my-activemq
3)创建Maven项目,添加依赖POM.xml

dependencies>
<!-- activemq 相关maven依赖 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.7.0</version>
        </dependency>
<!-- 日志相关依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>

4)创建消息发送者(生产者)Sender.java

package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 发送者
 * 2016年5月6日 下午3:00:57
 * @author zhangxiaoping
 */
public class Sender {
    private static final Logger LOGGER=LoggerFactory.getLogger(Sender.class);
    //默认代理地址 "failover://tcp://localhost:61616"  服务器地址不同IP修改不同的IP
    private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
    //消息队列名称 
    private static final String SUBJECT="my-activemq";
    private static int i=1;
    public static void main(String[] args) throws JMSException, InterruptedException {
        //初始化连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
        //建立连接
        Connection conn= connectionFactory.createConnection();
        //启动连接
        conn.start();
         //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
        Session session= conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目标队列
        Destination dest = session.createQueue(SUBJECT);
        //通过session创建消息的发送者
        MessageProducer producer=session.createProducer(dest);
        while(true){
            //定义要发送的消息
            TextMessage message= session.createTextMessage("======ActiveMQ发送消息===="+i+"===");
            LOGGER.debug(message.getText());
            //发送消息
            producer.send(message);
            //休眠2秒
            Thread.sleep(2000);
            i++;
        }
//      conn.close();
        
    }

}

5)创建消息的接收者(消费者)Receiver.java

package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 接收者
 * 2016年5月6日 下午3:03:16
 * @author zhangxiaoping
 */
public class Receiver implements MessageListener{
    private static final Logger LOGGER=LoggerFactory.getLogger(Receiver.class);
    //默认代理地址 "failover://tcp://localhost:61616"  服务器地址不同IP修改不同的IP
    private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
    //消息队列名称 
    private static final String SUBJECT="my-activemq";
    public static void main(String[] args) throws JMSException {
        //初始化连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
        //建立连接
        Connection conn= connectionFactory.createConnection();
        //启动连接
        conn.start();
         //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
        Session session= conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标队列
        Destination dest=session.createQueue(SUBJECT);
        //通过session创建消息的接收者
        MessageConsumer consumer= session.createConsumer(dest);
        //初始化监听
        Receiver receiver=new Receiver();
        //给接收者添加监听对象
        consumer.setMessageListener(receiver);
    }
    public void onMessage(Message arg0) {
        TextMessage message=(TextMessage) arg0;
        try {
            LOGGER.debug("接收到消息"+message.getText());
            Thread.sleep(4000);
        } catch (Exception e) {
            LOGGER.error("error"+e.getMessage());
        }
        
    }
}

6)运行Sender.java、Receiver.java登录http://localhost:8161/admin/查看队列信息。

消息队列.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 5,384评论 0 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,092评论 19 139
  • 一、 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能...
    步积阅读 57,320评论 10 138
  • 1、前言 之前我们通过两篇文章(架构设计:系统间通信(19)——MQ:消息协议(上)、架构设计:系统间通信(20)...
    境里婆娑阅读 5,895评论 0 4
  • 1 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,...
    Bobby0322阅读 13,723评论 0 24

友情链接更多精彩内容