赞
踩
MQ全称为Message Queue,即消息队列。
“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。
异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。
削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。
RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件:
首先到erlang官网下载win10版安装包。
环境变量
使用cmd命令,输入 erl -version 验证:
在RabbitMQ的gitHub项目中,下载window版本的服务端安装包。
安装完成后,找到安装目录:
在此目录下打开cmd命令,输入rabbitmq-plugins enable rabbitmq_management命令安装管理页面的插件:
如若报错,应该是版本不兼容,找到兼容版本即可
重启mq服务:
打开浏览器输入http://localhost:15672,账号密码默认是:guest/guest
生产者发送消息流程
生产者和Broker建立TCP连接
生产者和Broker建立通道
生产者通过通道消息发送给Broker,由Exchange将消息进行转发
Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程
消费者和Broker建立TCP连接
消费者和Broker建立通道
消费者监听指定的Queue(队列)
当有消息到达Queue时Broker默认将消息推送给消费者
消费者接收到消息
ack回复
在上图的模型中,有以下概念:
新建一个maven工程,添加amqp-client依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
连接工具类
public class ConnectionUtil { /** * 建立与RabbitMQ的连接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("192.168.1.103"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/kavito");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq factory.setUsername("kavito"); factory.setPassword("123456"); // 通过工厂获取连接 Connection connection = factory.newConnection(); return connection; } }
生产者发送消息
public class Send { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 1、获取到连接 Connection connection = ConnectionUtil.getConnection(); // 2、从连接中创建通道,使用通道才能完成消息相关的操作 Channel channel = connection.createChannel(); // 3、声明(创建)队列 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4、消息内容 String message = "Hello World!"; // 向指定的队列中发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //关闭通道和连接(资源关闭最好用try-catch-finally语句处理) channel.close(); connection.close(); } }
控制台输出
web管理页面
点击队列名称,进入详情页,可以查看消息:
public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); // 声明队列 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 即消息体 String msg = new String(body,"utf-8"); System.out.println(" [x] received : " + msg + "!"); } }; // 监听队列,第二个参数:是否自动进行消息确认。 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_NAME, true, consumer); } }
控制台
再看看队列的消息,已经被消费了
消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
RabbitMQ怎么知道消息被接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
综上,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK(如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便)
手动ACK:消息接收后,不会发送ACK,需要手动调用(如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果w此时消费者宕机,那么消息就丢失了。)
rabbit默认是自动ACK的,来看下自动ACK会出现的问题:
直接运行,消息发送成功:
运行消费者,程序抛出异常:
管理界面,消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息
那么,在手动进行ack前抛出异常,运行Recv2
再看看管理界面,发现数据没有被消费掉
public class Recv2 { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手动进行ACK /* * void basicAck(long deliveryTag, boolean multiple) throws IOException; * deliveryTag:用来标识消息的id * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。 */ channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列,第二个参数false,手动进行ACK channel.basicConsume(QUEUE_NAME, false, consumer); } }
和前面模式不同:
public class Send { private final static String EXCHANGE_NAME = "test_fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "注册成功!!"; // 发布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); channel.close(); connection.close(); } }
注册成功发给短信服务
public class Recv { private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列 private final static String EXCHANGE_NAME = "test_fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [短信服务] received : " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
注册成功发给邮件服务
public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_email";//邮件队列 private final static String EXCHANGE_NAME = "test_fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [邮件服务] received : " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
我们运行两个消费者,然后发送1条消息:
public class Send { private final static String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 消息内容, String message = "注册成功!请短信回复[T]退订"; // 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息 channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列 private final static String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息 //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [短信服务] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Recv2 { private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列 private final static String EXCHANGE_NAME = "test_direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息 // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [邮件服务] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
我们发送sms的RoutingKey,发现结果:只有指定短信的消费者1收到消息了
每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
通配符规则:
public class Send { private final static String EXCHANGE_NAME = "test_topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 消息内容 String message = "这是一只行动迅速的橙色的兔子"; // 发送消息,并且指定routing key为:quick.orange.rabbit channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes()); System.out.println(" [动物描述:] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "topic_exchange_queue_Q1"; private final static String EXCHANGE_NAME = "test_topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Recv2 { private final static String QUEUE_NAME = "topic_exchange_queue_Q2"; private final static String EXCHANGE_NAME = "test_topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
结果C1、C2是都接收到消息了:
创建两个工程:mq-rabbitmq-producer和mq-rabbitmq-consumer,分别配置1、2、3(第三步本例消费者用注解形式,可以不用配)
1、添加AMQP的启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
2、在application.yml
中添加RabbitMQ的配置:
server: port: 10086 spring: application: name: mq-rabbitmq-producer rabbitmq: host: 192.168.1.103 port: 5672 username: kavito password: 123456 virtualHost: /kavito template: retry: enabled: true initial-interval: 10000ms max-interval: 300000ms multiplier: 2 exchange: topic.exchange publisher-confirms: true
属性说明:
当然如果consumer只是接收消息而不发送,就不用配置template相关内容。
3、定义RabbitConfig配置类,配置\Exchange、Queue、及绑定交换机。
@Configuration public class RabbitmqConfig { public static final String QUEUE_EMAIL = "queue_email";//email队列 public static final String QUEUE_SMS = "queue_sms";//sms队列 public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机 public static final String ROUTINGKEY_EMAIL="topic.#.email.#"; public static final String ROUTINGKEY_SMS="topic.#.sms.#"; //声明交换机 @Bean(EXCHANGE_NAME) public Exchange exchange(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //声明email队列 /* * new Queue(QUEUE_EMAIL,true,false,false) * durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 * auto-delete 表示消息队列没有在使用时将被自动删除 默认是false * exclusive 表示该消息队列是否只在当前connection生效,默认是false */ @Bean(QUEUE_EMAIL) public Queue emailQueue(){ return new Queue(QUEUE_EMAIL); } //声明sms队列 @Bean(QUEUE_SMS) public Queue smsQueue(){ return new Queue(QUEUE_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
@SpringBootTest @RunWith(SpringRunner.class) public class Send { @Autowired RabbitTemplate rabbitTemplate; @Test public void sendMsgByTopics(){ /** * 参数: * 1、交换机名称 * 2、routingKey * 3、消息内容 */ for (int i=0;i<5;i++){ String message = "恭喜您,注册成功!userid="+i; rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message); System.out.println(" [x] Sent '" + message + "'"); } } }
运行测试类发送5条消息:
web管理界面: 创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息
在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。
@Component public class ReceiveHandler { //监听邮件队列 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue_email", durable = "true"), exchange = @Exchange( value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"topic.#.email.#","email.*"})) public void rece_email(String msg){ System.out.println(" [邮件服务] received : " + msg + "!"); } //监听短信队列 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue_sms", durable = "true"), exchange = @Exchange( value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"topic.#.sms.#"})) public void rece_sms(String msg){ System.out.println(" [短信服务] received : " + msg + "!"); } }
属性说明:
启动mq-rabbitmq-comsumer项目
大功告成!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。