赞
踩
1.1 MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息进行持久化的容器。多用于分布式系统之间进行通信,比如在分布式系统中,多个项目就可以使用MQ来进行通信,A项目的数据发送MQ容器中,B项目在通过MQ容器拿到消息
1.2 应用解耦
上图可以看出库存、支付、物流、都依赖于订单系统,如果库存系统挂了势必会影响订单系统,如果订单系统也挂了,库存、支付、物流都会挂掉,如果我增加一个X系统也依赖于订单系统,也会挂掉,耦合性太高了,下面我们用MQ来解耦
上图使用MQ解耦后,就好多嘞,订单系统的数据发送给MQ,库存、支付、物流系统直接依赖于MQ,库存挂了不影响其他系统了,因为订单、支付、物流都和MQ打交到了,哪怕我再增加一个X系统,也和MQ打交道了,库存系统恢复了后,直接也和MQ建立关系,继续运作。
1.3 异步提速
上图可以看出订单系统去DB里查询数据需要20ms,库存、支付、物流三个再去查询订单各自都需要300ms,每个系统得到结果都需要320ms。
上图使用MQ解耦后,就快多了,订单数据在DB查询出来传递给MQ,库存、支付、物流三个在DB中拿取只有5秒,每个系统得到数据只有25ms。
1.4 削峰填谷
上图比如A系统的某个接口,一次最多可以接收到1000个请求,高并发情况下每秒达到了5000个请求,这样系统肯定会崩溃的,使用MQ进行削峰填谷
上图可以看出加上了MQ后,一次5000个请求全部到达MQ里面去了,但是这种情况MQ会积压很多消息,这就叫做削峰,等到高峰期过了之后,A系统设置每秒从MQ拉取1000次,慢慢的消费完MQ里的消息,叫做填谷。
1.5 常见的 MQ 产品
1.6 RabbitMQ 简介
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
上图就是RabbitMq中的简介
Producer:生产者,消息的发起者
Consumer:消费者,消息的接受者
Connection:生产者和消费者与MQ服务器建立的长连接
channel:相当于轻量级的Connection,每次发送接收消息都建立一个Connection长连接太浪费资源了,就在Connection内部建立逻辑连接channel。
Broker:就是MQ服务器
Virtual host:虚拟机,相当于一个MQ服务器的一个分组,可以有多个
Exchange:交换机,消息到达MQ的第一站,消息由交换机根据routing key转发给不同的队列。
Queue:队列,消息最终由交换机到达队列等待消费者来拿取
Binding:绑定交换机和队列之间的关系
1.7 RabbitMQ 提供了 5种工作模式
RabbitMQ 提供了 5 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
下面就来熟悉5种模式,先看看项目结构
首先需要创建一个maven项目,这里不再讲述如何创建maven项目了
创建maven项目后,需要先创建RabbitMQ所需要的工具类
在com.wdp.rabbitmq.utils包下创建RabbitUtils工具类,该工具类是存放RabbitMQ账号密码虚拟机连接等
package com.wdp.rabbitmq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitUtils { private static ConnectionFactory connectionFactory = new ConnectionFactory(); static { connectionFactory.setHost("39.107.90.1");//服务器地址 connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号 connectionFactory.setUsername("wdp123"); //mq的账号 connectionFactory.setPassword("wdp123"); //mq的密码 connectionFactory.setVirtualHost("wdp"); //mq的虚拟机 } /** * 建立与mq的长连接 * @return */ public static Connection getConnection(){ Connection conn = null; try { conn = connectionFactory.newConnection(); return conn; } catch (Exception e) { throw new RuntimeException(e); } } }
在com.wdp.rabbitmq.utils包下创建RabbitConstant类定义交换机和队列的名称
package com.wdp.rabbitmq.utils;
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
2.1、简单队列
P:消费者
C:消费者
红框:队列
简单队列只有一个生产者一个消费者,发送消息过后,消费者去队列拿取消息消费
在com.wdp.rabbitmq.hellword包下创建生产者
package com.wdp.rabbitmq.hellword; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //获取mq建立的长连接 Connection connection = RabbitUtils.getConnection(); //创建通信通道,生产者从通道发送数据到mq服务器 Channel channel = connection.createChannel(); //创建队列,声明并创建一个队列,如果队列已经存在直接使用 //第一个参数:队列名称 //第二个参数:数据是否持久化,false对应不持久化的数据,如果数据没有被消费者消费,消费者MQ停掉数据就会丢失 //第三个参数:队列是否私有化,false代表所有消费者都可以访问,true代表只有第一次使用它的消费者才可以访问 //第四个参数:是否自动删除,false代表停掉后不自动删除这个队列 //其他额外的参数,null channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null); String message = "hello word"; //第一个参数:交换机,简单队列暂时不需要 //第二个参数:队列的名称 //第三个参数:额外的设置属性 //第四个参数:传递消息的字节数组 channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,null,message.getBytes()); channel.close(); connection.close(); System.out.println("发送成功"); } }
在com.wdp.rabbitmq.hellword包下创建消费者
package com.wdp.rabbitmq.hellword; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class Consumer { public static void main(String[] args) throws IOException { //获取mq建立的长连接 Connection connection = RabbitUtils.getConnection(); //创建通信通道,生产者从通道发送数据到mq服务器 Channel channel = connection.createChannel(); //创建队列,声明并创建一个队列,如果队列已经存在直接使用 //第一个参数:队列名称 //第二个参数:数据是否持久化,false对应不持久化的数据,如果数据没有被消费者消费,消费者MQ停掉数据就会丢失 //第三个参数:队列是否私有化,false代表所有消费者都可以访问,true代表只有第一次使用它的消费者才可以访问 //第四个参数:是否自动删除,false代表停掉后不自动删除这个队列 //其他额外的参数,null channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null); //从MQ服务器中获取数据 //第一个参数:队列名 //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法 //第三个参数要传入DefaultConsumer的实现类,手动进行确认消息 channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false,new Reciver(channel)); } } class Reciver extends DefaultConsumer{ private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者接收到的消息:"+message); System.out.println("消息的TagId:"+envelope.getDeliveryTag()); //false只确认签收当前的信息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag(),false); } }
注意:简单队列是不需要交换机的,会走默认的交换机
2.2 Work queues 工作队列模式
在这里插入图片描述
P:消费者
C1:消费者
C2:消费者
红框:队列
工作队列:是生产者投递到了队列,消费者会有多个同时去队列去。
应用场景:多个消费者同时绑定一个队列,比如一条短信要让所有人收到,就可以使用工作队列模式
在com.wdp.rabbitmq.workqueue包下创建一个SMS类用于发送消息
package com.wdp.rabbitmq.workqueue; public class SMS { private String name; private String mobile; private String content; public SMS(String name, String mobile, String content) { this.name = name; this.mobile = mobile; this.content = content; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMobile() { return mobile; } public void setMobile(String mobile) { this.mobile = mobile; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
在com.wdp.rabbitmq.workqueue下创建OrderSystem类
package com.wdp.rabbitmq.workqueue; import com.google.gson.Gson; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for(int i = 1 ; i <= 100 ; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } }
在com.wdp.rabbitmq.workqueue包下创建SMSSender1消费者
package com.wdp.rabbitmq.workqueue; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; /** * * 消费者 */ public class SMSSender1 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender1-短信发送成功:" + jsonSMS); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
在com.wdp.rabbitmq.workqueue包下创建SMSSender2消费者
package com.wdp.rabbitmq.workqueue; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; /** * * 消费者 */ public class SMSSender2 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender2-短信发送成功:" + jsonSMS); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
在com.wdp.rabbitmq.workqueue包下创建SMSSender3消费者
package com.wdp.rabbitmq.workqueue; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; /** * * 消费者 */ public class SMSSender3 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender3-短信发送成功:" + jsonSMS); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
注意:
channel.basicQos(1);这句代码,这句代码的意思是一个消息必须处理签收完毕在能开始下一个消息,思想就是能者多劳,多个消费者有的执行的快多消费,有的执行的慢就慢消费,工作队列是一对多,必须多个消费者绑定一个队列。
2.3 Pub/Sub 订阅模式
P:生产者
X:交换机
C1:消费者
C2:消费者
红框:队列
发布订阅模式分为
1、生产者发送消息需要指定一个交换机,先发送到交换机里
2、消费者需要建立队列和交换机的绑定关系
3、交换机将消息推送给绑定了该交换机的队列
交换机有三种模式:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给通过#,*匹配的(路由模式)队列
发布订阅模式的使用场景:比如北京市的天气预报,新浪和百度都需要北京市的天气预报,生产者将天气的消息投递给交换机,百度的队列和新浪的队列都绑定了这个交换机,百度和新浪都可以拿到北京市的天气预报,发布订阅模式使用的是广播模式
在com.wdp.rabbitmq.pubsub包下创建生产者WeatherBureau
package com.wdp.rabbitmq.pubsub; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class WeatherBureau { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); String input = new Scanner(System.in).next(); Channel channel = connection.createChannel(); channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes()); channel.close(); connection.close(); } }
在com.wdp.rabbitmq.pubsub包下创建Sina
package com.wdp.rabbitmq.pubsub; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //queueBind用于交换机和队列的绑定 //参数1: 队列名 参数2:交互机名 参数3:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER,""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
在com.wdp.rabbitmq.pubsub包下创建BiaDu
package com.wdp.rabbitmq.pubsub; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class BiaDu { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于交换机和队列的绑定 //参数1: 队列名 参数2:交互机名 参数3:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER,""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
2.4 Routing 路由模式
P:生产者
X:交换机
C1:消费者
C2:消费者
红框:队列
Routing 路由模式分为3个步骤
1、生产者发送消息到交换机的时候会绑定一个Routing key
2、消费者去队列取消息的时候,消费者建立队列和交换机和Routing key的关系
3、交换机会根据队列的Routing key进行分发消息
上图的图解:
生产者发送error、info、warning为Routing key的消息到交换机,交换机将Routing key为error的消息发送给c1消费者所在的队列,因为c1消费者指定队列消息的Routing key为error,交换机将Routing key为error、info、warning发送给c2消费者所在的队列,因为c2消费者指定队列消息的Routing key为error、info、warning。
应用场景:还是用天气举例,比如北京市和上海市,两个不同的城市天气也不一样,就需要交换机发送不同的天气信息给不同的城市
在com.wdp.rabbitmq.routing包下创建生产者WeatherBureau
package com.wdp.rabbitmq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; /** * * 发布者 */ public class WeatherBureau { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据"); area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据"); area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据"); area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据"); area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据"); area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据"); area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据"); area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator(); while (itr.hasNext()){ Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null,me.getValue().getBytes()); } channel.close(); connection.close(); } }
在com.wdp.rabbitmq.routing包下创建Sina消费者
package com.wdp.rabbitmq.routing; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class Sina { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数3:路由key channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.zhuzhou.20201127"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
在com.wdp.rabbitmq.routing包下创建BaiDu消费者
package com.wdp.rabbitmq.routing; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class BaiDu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数3:路由key channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.changsha.20201127"); channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
2.5 Topics 通配符模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
应用场景:还是天气的例子,比如说,一个城市下有很多个区县,区县的天气会不一样,这点类似模糊查询,我需要所有东边区县的天气,就需要通过通配符匹配
在com.wdp.rabbitmq.routing包下创建WeatherBureau
package com.wdp.rabbitmq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; /** * * 发布者 */ public class WeatherBureau { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据"); area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据"); area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据"); area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据"); area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据"); area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据"); area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据"); area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator(); while (itr.hasNext()){ Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null,me.getValue().getBytes()); } channel.close(); connection.close(); } }
在com.wdp.rabbitmq.routing包下创建Sina
package com.wdp.rabbitmq.routing; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class Sina { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数3:路由key channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.zhuzhou.20201127"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
在com.wdp.rabbitmq.routing包下创建BaiDu
package com.wdp.rabbitmq.routing; import com.rabbitmq.client.*; import com.wdp.rabbitmq.utils.RabbitConstant; import com.wdp.rabbitmq.utils.RabbitUtils; import java.io.IOException; public class BaiDu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数3:路由key channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.changsha.20201127"); channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。