赞
踩
rabbitmq是基于amqp协议实现一套高效的数据传输组件,MQ(消息队列)。
常见的MQ:ActiveMQ、Kafka、RocketMQ、RabbitMQ
官方文档:https://www.rabbitmq.com/getstarted.html
1、消息异步通知(注册时邮箱认证、添加商品生成详情页和将商品添加到搜索库等)
2、消息顺序处理
3、消息延迟处理
4、请求削峰
1.1 simple简单模式
1)消息产生后将消息放入队列
2)消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
1.2 work工作模式(资源的竞争)
1)消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样。保证一条消息只能被一个消费者使用)
2)应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
1.3 publish/subscribe发布订阅(共享资源)
1)X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
2)相关场景:邮件群发,群聊天,广播(广告)
1.4 routing路由模式
1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
2)根据业务功能定义路由字符串
3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误
1.5 topic 主题模式(路由模式的一种)
1)星号、#号代表通配符
2)星号代表一个单词,#号代表一个或多个单词
3)路由功能添加模糊匹配
4)消息产生者产生消息,把消息交给交换机
5)交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
1.6 RPC
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1)客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2)服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3)服务端将RPC方法 的结果发送到RPC响应队列。
4)客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
1.1 simple简单模式
工具类
public class ConnectionUtil { //连接rabbitmq服务,共享一个工厂对象 private static ConnectionFactory factory; static { factory=new ConnectionFactory(); //设置rabbitmq属性 factory.setHost("192.168.65.128"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/admin"); factory.setPort(5672); } public static Connection getConnection(){ Connection connection=null; try { //获取连接对象 connection = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return connection; } }
提供方
public class Provider { public static void main(String[] args) { try { //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建) channel.queueDeclare("queue1",false,false,false,null); //向队列中发送消息 channel.basicPublish("","queue1",null,"Hello RabbitMQ!!!".getBytes()); //断开连接 connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费方
public class Consumer { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //监听队列中的消息(消费的是队列,而不是交换机) channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
1.2 work工作模式(资源的竞争)
提供方
public class Provider { public static void main(String[] args) { try { //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //通过通道创建队列 channel.queueDeclare("queue1",false,false,false,null); //向队列中发送消息 for(int i=1;i<=10;i++){ channel.basicPublish("","queue1",null,("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费方1
public class Consumer { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //监听队列中的消息 channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
消费方2
public class Consumer2 { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //监听队列中的消息 channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
1.3 publish/subscribe发布订阅(共享资源)
提供方
//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建 public class Provider { public static void main(String[] args) { try { //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失) //1.参数一:交换机名称 参数二:交换机类型 channel.exchangeDeclare("fanout_exchange","fanout"); //通过通道创建队列 //channel.queueDeclare("queue1",false,false,false,null); //向队列中发送消息 for(int i=1;i<=10;i++){ channel.basicPublish("fanout_exchange","",null,("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费方1
public class Consumer { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("fanout_queue1",false,false,false,null); //给队列绑定交换机 channel.queueBind("fanout_queue1","fanout_exchange",""); //监听队列中的消息 channel.basicConsume("fanout_queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
消费方2
public class Consumer2 { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("fanout_queue2",false,false,false,null); //给队列绑定交换机 channel.queueBind("fanout_queue2","fanout_exchange",""); //监听队列中的消息 channel.basicConsume("fanout_queue2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
1.4 routing路由模式(不支持通配符)
提供方
//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建 public class Provider { public static void main(String[] args) { try { //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失) //1.参数一:交换机名称 参数二:交换机类型 channel.exchangeDeclare("direct_exchange","direct"); //向队列中发送消息 for(int i=1;i<=10;i++){ channel.basicPublish("direct_exchange", "insert", //设置路由键,符合路由键的队列,才能拿到消息 null, ("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费方1
public class Consumer { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("direct_queue1",false,false,false,null); //绑定交换机(routingKey:路由键) channel.queueBind("direct_queue1","direct_exchange","select"); channel.queueBind("direct_queue1","direct_exchange","insert"); //监听队列中的消息 channel.basicConsume("direct_queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
消费方2
public class Consumer2 { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("direct_queue2",false,false,false,null); //绑定交换机(routingKey:路由键) channel.queueBind("direct_queue2","direct_exchange","delete"); channel.queueBind("direct_queue2","direct_exchange","select"); //监听队列中的消息 channel.basicConsume("direct_queue2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
1.5 topic 主题模式(路由模式的一种,支持通配符)
提供方
//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建 public class Provider { public static void main(String[] args) { try { //获取连接对象 Connection connection = ConnectionUtil.getConnection(); //获取通道对象 Channel channel = connection.createChannel(); //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失) //1.参数一:交换机名称 参数二:交换机类型 channel.exchangeDeclare("topic_exchange","topic"); //向队列中发送消息 for(int i=1;i<=10;i++){ channel.basicPublish("topic_exchange", "emp.hello world", // #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况) *(匹配一个单词) null, ("Hello RabbitMQ!!!"+i).getBytes()); } //断开连接 connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费方1
public class Consumer { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("topic_queue1",false,false,false,null); //绑定交换机(routingKey:路由键) #:匹配0-n个单词(之间以.区分,两点之间算一个单词) channel.queueBind("topic_queue1","topic_exchange","emp.#"); //监听队列中的消息 channel.basicConsume("topic_queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
消费方2
public class Consumer2 { public static void main(String[] args) { Connection connection = ConnectionUtil.getConnection(); try { //获取通道对象 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("topic_queue2",false,false,false,null); //绑定交换机(routingKey:路由键) *:匹配1个单词(之间以.区分,两点之间算一个单词) channel.queueBind("topic_queue2","topic_exchange","emp.*"); //监听队列中的消息 channel.basicConsume("topic_queue2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2获得消息为:"+new String(body,"utf-8")); } }); //消费方不需要关闭连接,保持一直监听队列状态 //connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
1、交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。保证,哪一方先运行则在哪一方创建。
2、交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失。
3、通配符:#:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况) 、 *(匹配一个单词)
有不清楚的地方可以在评论下方留言。。。既然来了,不妨点个关注,点个赞吧!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。