赞
踩
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信
应用之间的远程调用
加入MQ后应用之间的调用
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合
系统的耦合性越高,容错性就越低,可维护性就越低
使用 MQ 使得应用间解耦,提升容错性和可维护性
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间
如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了
消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个消息,这样慢慢写入数据库,这样就不会卡死数据库了
但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”
实现MQ的大致有两种主流方式:AMQP、JMS。
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用
层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP
JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间
件的API
JMS 是 JavaEE 规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有
官网地址:http://www.rabbitmq.com/
架构图:
相关概念:
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由
模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)
模型:
在上图的模型种,有以下概念:
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
创建工具类,用来给后面的几个模式的创建工程进行调用
package com.study.rabbitmq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtils { public static Connection getConnetion() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址;默认为 localhost (这里是我的虚拟机ip) connectionFactory.setHost("192.168.249.4"); //连接端口;默认为 5672 connectionFactory.setPort(5672); //虚拟主机名称;默认为 / (先在rabbitmq管理台创建该虚拟机) connectionFactory.setVirtualHost("/xzk"); //连接用户名;默认为guest (先在rabbitmq管理台创建该用户) connectionFactory.setUsername("wx"); //连接密码;默认为guest connectionFactory.setPassword("wx"); Connection connection = connectionFactory.newConnection(); return connection; } }
编写消息的生产者,也就是发送消息的程序
package com.study.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.study.rabbitmq.utils.ConnectionUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 模拟生成者 */ public class Producer { public static String QUEUE_NAME="simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnetion(); //创建频道 Channel channel = connection.createChannel(); //声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接,只能有一个Consumer监听这个队列 * 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除 * 参数5:队列其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //发送消息 String message = "你好:小兔子"; /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息:" + message); //释放资源 channel.close(); connection.close(); } }
package com.study.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; import com.study.rabbitmq.utils.ConnectionUtils; /** * 模拟消费者 */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnetion(); //创建频道(channel) Channel channel = connection.createChannel(); //声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接,只能有一个Consumer监听(消费)这个队列 * 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除 * 参数5:队列其它参数 */
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。