
ActiveMQ - 初步认识
消息中间件应用场景
场景说明: 用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信
场景说明: 用户下单后,订单系统需要通知库存系统。
场景说明: 秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。(在用户请求与秒杀业务处理中间加入消息队列)
ActiveMQ介绍与JMS协议
简介
ActiveMQ是遵守Apache开源规则的最流行,能力强劲的消息中间件。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS消息模式
消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)
点对点模型(Queue队列模型)
P2P: 即生产者和消费者之间的消息往来

每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时
P2P特点:
- 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
发布/订阅模型(Publish-Subscribe)
发布/订阅
包含三个角色: 主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。
发布/订阅模型的特点:
- 每个消息可以有多个消费者
- 发布者和订阅者之间有时间上的依赖性(先订阅再发布)
- 订阅者必须保持运行的状态,才能接收发布者发布的消息
JMS编程API
要素 |
作用 |
Destination |
表示消息所走通道的目标定义,用来定义消息从发送端发出后要走的通道,而不是接收方。Destination属于类对象 |
ConnectionFactory |
用于创建连接对象,ConnectionFactory属于管理类的对象 |
Connection |
连接接口,所负责的重要工作时创建Session |
Session |
会话接口,这是一个非常重要的对象,消息发送者、消息接收者以及消息对象本省,都是通过这个会话对象创建的 |
MessageConsume |
消息消费者,也就是订阅消息并处理消息的对象 |
MessageProducer |
消息的生产者,也就是用来发送消息的对象 |
- ConnectionFactory
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
- Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic
- Connection
Connection表示在客户端和JMS系统之间建立的连接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。
- Session
Session是我们对消息进行操作的接口,可以通过Session创建生产者、消费者、消息等。Session提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
- Producer
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型: QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息
- Consumer
消息消费者由Session创建,用于接收被发送到Destination的消息。两张类型: QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的createDurableSubscriber方法来创建持久化的订阅者。
- MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

ActiveMQ安装
安装(Linux)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 第一步: 安装JDK(略)
第二步: 下载activemq的压缩包(apache-activemq-5.15.12-bin.tar.gz)到Linux系统
第三步: 解压文件 tar -zxvf apache-activemq-5.15.12-bin.tar.gz
第四步: 进入apache-activemq-5.15.12的bin目录
cd apache-activemq-5.15.12/bin
第五步: 启动activemq ./activemq start (执行2次:第一次生产配置文件;第二次启动)
第六步: 停止activemq ./activemq stop
其它命令: ./activemq status -- 查看activemq的状态 ./activemq restart -- 重启activemq ./activemq purge FOO.BAR -- 删除队列中的所有消息,队列名称是FOO.BAR ./activemq dstat -- 显示默认broker的所有主题和队列统计信息 ./activemq dstat topics -- 显示主题的统计信息 ./activemq dstat queue -- 显示队列的统计信息 ...
|
访问
http://127.0.0.1:8161
1 2 3 4
| 页面控制台: http://ip:8161 (监控) 请求地址: tcp://ip:61616 (java代码访问消息中间件)
初始用户名和密码: admin/admin
|
原生JMS开发
点对点模式
生产者
- Maven引入依赖
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.11</version> </dependency> </dependencies>
|
- 编写生产消息的类(PTP_Producer.class)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class PTP_Producer { public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.创建连接 Connection connection = factory.createConnection(); //3.打开连接 connection.start(); //4.创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息) Queue queue = session.createQueue("queue01"); //6.创建消息生产者 MessageProducer producer=session.createProducer(queue); //7.创建消息 TextMessage message=session.createTextMessage("hello,this is PTP message"); //8.发送消息 producer.send(message); System.out.println("生产者发送完毕..."); //9.释放资源 session.close(); connection.close(); } }
|
- 运行效果


消费者
- Maven引入依赖
如上,略
- 编写接收消息的类(PTP_Consumer.class) – receive方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class PTP_Consumer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue01"); MessageConsumer consumer = session.createConsumer(queue); while (true){ Message message = consumer.receive(); if(message == null){ break; }
if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; System.out.println("接受到的消息: "+textMessage.getText()); } } } }
|
- 编写接收消息的类(PTP_Consumer2.class) – 监听器方法(常用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class PTP_Consumer2 { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue01"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息(2):"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); } }
|
- 运行效果


发布订阅模式
生产者
- Maven引入依赖
如上,略
- 编写生产类(PS_Producer.class)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public class PS_Producer { public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.创建连接 Connection connection = factory.createConnection(); //3.打开连接 connection.start(); //4.创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息) Topic topic = session.createTopic("topic01"); //6.创建消息生产者 MessageProducer producer=session.createProducer(topic); //7.创建消息 TextMessage message=session.createTextMessage("hello,this is PS message"); //8.发送消息 producer.send(message); System.out.println("生产者发送完毕..."); //9.释放资源 session.close(); connection.close(); } }
|
- 运行效果


消费者
- Maven引入依赖
如上,略
- 编写生产类(PS_Consumer.class)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class PS_Consumer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic01"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息---topic:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); } }
|
- 运行效果)

这时我们看到订阅到的topic消息是没有被消费的。上面有说到,在发布订阅模式下,一定要先启动消费者,然后才能消费到发布者推送的订阅的信息。让我们重新启动下PS_Producer类,再看看效果

这时已经成功获取到消息了,再看看页面控制台

消息入列2条,成功出列1条,1个消费者