赞
踩
一对一模式,一个生产者、一个消费者,生产者发送消息,消费者消费消息。
生产者:也就是要发送消息的程序
消费者:消息的接收者,会一直等待消息到来
消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
- //生产者
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
-
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("118.31.55.110");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
-
- Connection connecte = connectionFactory.newConnection();
- Channel channel = connecte.createChannel();
-
- channel.queueDeclare("queue", false, false, false, null);
- String message = "Jaosn Hello World!";
-
- channel.basicPublish("","queue",null,message.getBytes());
-
- channel.close();
- connecte.close();
-
- System.out.println("=====消息发送成功======");
- }
-
- }
-
-
- //消费者
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
-
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("118.31.55.110");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
-
- Connection connecte = connectionFactory.newConnection();
- Channel channel = connecte.createChannel();
-
- channel.queueDeclare("queue", false, false, false, null);
-
- //从mq服务器获取数据
- channel.basicConsume("queue",false,new Reciver(channel));
-
- }
- }
-
- class Reciver extends DefaultConsumer {
-
- private Channel channel;
- //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
- public Reciver(Channel channel) {
- super(channel);
- this.channel = channel;
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
- String message = new String(body);
- System.out.println("消费者接收到的消息:"+message);
-
- System.out.println("消息的TagId:"+envelope.getDeliveryTag());
- //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
- //订单系统
- public class OrderSystem {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitUtils.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
-
- for(int i = 1 ; i <= 100 ; i++) {
- SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
- String jsonSMS = new Gson().toJson(sms);
- channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
- }
- System.out.println("发送数据成功");
- channel.close();
- connection.close();
- }
- }
- //短信服务1
- public class SMSSender1 {
-
- public static void main(String[] args) throws IOException {
-
-
- Connection connection = RabbitUtils.getConnection();
- final Channel channel = connection.createChannel();
-
- channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
-
- //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
- //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
- channel.basicQos(1);//处理完一个取一个
-
- channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String jsonSMS = new String(body);
- System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
-
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
-
-
- }
-
- //短信服务2
- public class SMSSender2 {
-
- public static void main(String[] args) throws IOException {
-
-
- Connection connection = RabbitUtils.getConnection();
- final Channel channel = connection.createChannel();
-
- channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
-
- //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
- //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
- channel.basicQos(1);//处理完一个取一个
-
- channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String jsonSMS = new String(body);
- System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
-
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
- }
-
-
- //短信服务3
- public class SMSSender3 {
-
-
- public static void main(String[] args) throws IOException {
-
-
- Connection connection = RabbitUtils.getConnection();
- final Channel channel = connection.createChannel();
-
- channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
-
- //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
- //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
- channel.basicQos(1);//处理完一个取一个
-
- channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String jsonSMS = new String(body);
- System.out.println("SMSSender3-短信发送成功:" + jsonSMS);
-
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
- }
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
消费者,消息的接收者,会一直等待消息到来
消息队列,接收消息、缓存消息
交换机一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
交换机:只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
(1)图
(2)创建队列
(3)创建交换机
(4)队列跟交换机绑定
(5)代码:
- //发布者
- public class WeatherBureau {
-
-
- public static void main(String[] args) throws Exception {
- Connection connection = RabbitUtils.getConnection();
- String input = new Scanner(System.in).next();
- Channel channel = connection.createChannel();
-
- //第一个参数交换机名字 其他参数和之前的一样
- channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());
-
- channel.close();
- connection.close();
- }
- }
-
-
- //订阅者
- public class Sina {
-
- public static void main(String[] args) throws IOException {
- //获取TCP长连接
- Connection connection = RabbitUtils.getConnection();
- //获取虚拟连接
- final Channel channel = connection.createChannel();
- //声明队列信息
- channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
-
- //queueBind用于将队列与交换机绑定
- //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
- channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
- channel.basicQos(1);
- channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("新浪天气收到气象信息:" + new String(body));
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
-
- }
-
-
-
- public class BiaDu {
-
- public static void main(String[] args) throws IOException {
- //获取TCP长连接
- Connection connection = RabbitUtils.getConnection();
- //获取虚拟连接
- final Channel channel = connection.createChannel();
- //声明队列信息
- channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
-
- //queueBind用于将队列与交换机绑定
- //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
- channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
- channel.basicQos(1);
- channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("百度天气收到气象信息:" + new String(body));
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
(6)小结:
a、交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
b、发布订阅模式与工作队列模式的区别:
图解:
- //发布者
- public class WeatherBureau {
-
- public static void main(String[] args) throws Exception {
-
- Map area = new LinkedHashMap<String, String>();
- area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
- area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
- area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");
- area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
-
- area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
- area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
- area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
- area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
-
-
- Connection connection = RabbitUtils.getConnection();
- Channel channel = connection.createChannel();
-
- Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
- while (itr.hasNext()) {
- Map.Entry<String, String> me = itr.next();
- //第一个参数交换机名字 第二个参数作为 消息的routing key
- channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());
- }
-
- channel.close();
- connection.close();
- }
- }
- //消费者
- public class Sina {
- public static void main(String[] args) throws IOException {
- //获取TCP长连接
- Connection connection = RabbitUtils.getConnection();
- //获取虚拟连接
- final Channel channel = connection.createChannel();
- //声明队列信息
- channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
-
- //指定队列与交换机以及routing key之间的关系
- channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
- channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
- channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128");
- channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012");
-
- channel.basicQos(1);
- channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("新浪天气收到气象信息:" + new String(body));
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
- }
-
-
- public class BiaDu {
- public static void main(String[] args) throws IOException {
- Connection connection = RabbitUtils.getConnection();
- final Channel channel = connection.createChannel();
- channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
- //queueBind用于将队列与交换机绑定
- //参数1:队列名 参数2:交互机名 参数三:路由key
- channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127");
- channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128");
- channel.basicQos(1);
- channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("百度天气收到气象信息:" + new String(body));
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
- }
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
- //发布者
- public class WeatherBureau {
-
-
- public static void main(String[] args) throws Exception {
-
- Map area = new LinkedHashMap<String, String>();
- area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
- area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
- area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
- area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
-
- area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
- area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
- area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
- area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
-
-
- Connection connection = RabbitUtils.getConnection();
- Channel channel = connection.createChannel();
-
- Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
- while (itr.hasNext()) {
- Map.Entry<String, String> me = itr.next();
- //第一个参数交换机名字 第二个参数作为 消息的routing key
- channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
-
- }
-
- channel.close();
- connection.close();
- }
- }
- //消费者
- public class Sina {
-
- public static void main(String[] args) throws IOException {
- //获取TCP长连接
- Connection connection = RabbitUtils.getConnection();
- //获取虚拟连接
- final Channel channel = connection.createChannel();
- //声明队列信息
- channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
-
- //指定队列与交换机以及routing key之间的关系
- channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");
-
- channel.basicQos(1);
- channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("新浪天气收到气象信息:" + new String(body));
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
- }
-
- }
-
-
- public class BiaDu {
-
- public static void main(String[] args) throws IOException {
- Connection connection = RabbitUtils.getConnection();
- final Channel channel = connection.createChannel();
- channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
- //queueBind用于将队列与交换机绑定
- //参数1:队列名 参数2:交互机名 参数三:路由key
- channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127");
- channel.basicQos(1);
- channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("百度天气收到气象信息:" + new String(body));
- channel.basicAck(envelope.getDeliveryTag() , false);
- }
- });
-
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。