当前位置:   article > 正文

RabbitMQ的五种模型_rabbitmq五种消息模型

rabbitmq五种消息模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。

1.基本消息模型:生产者–>队列–>一个消费者
2.work消息模型:生产者–>队列–>多个消费者共同消费
3.订阅模型-Fanout:广播,将消息交给所有绑定到交换机的队列,每个消费者都可以收到同一条消息
4.订阅模型-Direct:定向,把消息交给符合指定 rotingKey 的队列(路由模式)
5.订阅模型-Topic:通配符,把消息交给符合routing pattern(主题模式)的队列

(3、4、5这三种都属于订阅模型,只不过进行路由的方式不同)

下述均为引入rabbitmq的自有依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>4.2.0</version>
  5. </dependency>

第一种:简单队列模式

单个发送者(生产者)将消息发送到队列(每个队列都有一个唯一名字)单个接受者(消费者)获取消息

 生产者示例代码:

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.*;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * 简单队列模式(生产者)
  9. */
  10. public class SimpleModeProducer {
  11. private static String QUEUE_NAME = "SIMPLE_MODE";
  12. public static void main(String[] args) throws Exception {
  13. Map map =new HashMap();
  14. map.put("text", "测试内容");
  15. //1.连接远程rabbit-server服务器
  16. ConnectionFactory factory = new ConnectionFactory();
  17. factory.setHost("127.0.0.1");
  18. factory.setPort(5672);
  19. factory.setUsername("admin");
  20. factory.setPassword("admin");
  21. //2.创建一个连接
  22. Connection connection = factory.newConnection();
  23. //3.创建一个管道
  24. Channel channel = connection.createChannel();
  25. //4.定义创建一个队列
  26. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  27. //5.发送消息(注意发送和接受段相同字符集否则出现乱码)
  28. channel.basicPublish("", QUEUE_NAME, null, ser(map));
  29. System.out.println("生产成功!!");
  30. channel.close();
  31. connection.close();
  32. }
  33. /**
  34. * 将对象序列化为字节数组
  35. */
  36. private static byte[] ser(Object obj) throws IOException{
  37. ByteArrayOutputStream baos =new ByteArrayOutputStream();
  38. ObjectOutputStream oos =new ObjectOutputStream(baos);
  39. oos.writeObject(obj);
  40. return baos.toByteArray();
  41. }
  42. }

 消费者示例代码:

  1. import com.rabbitmq.client.*;
  2. import java.io.ByteArrayInputStream;
  3. import java.io.ObjectInputStream;
  4. import java.util.Map;
  5. /**
  6. * 简单队列模式(消费者)
  7. */
  8. public class SimpleModeConsumer {
  9. private static String QUEUE_NAME = "SIMPLE_MODE";
  10. public static void main(String[] args)throws Exception {
  11. //连接远程rabbit-server服务器
  12. ConnectionFactory factory = new ConnectionFactory();
  13. //MQ的服务ip
  14. factory.setHost("127.0.0.1");
  15. //端口
  16. factory.setPort(5672);
  17. factory.setUsername("admin");
  18. factory.setPassword("admin");
  19. //创建一个连接
  20. Connection connection = factory.newConnection();
  21. //创建一个管道
  22. Channel channel = connection.createChannel();
  23. //创建一个队列
  24. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  25. // 定义回调抓取消息
  26. Consumer consumer = new DefaultConsumer(channel) {
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
  29. try {
  30. Map map = (Map) dser(body);
  31. System.out.println(map.get("text").toString());
  32. System.out.println("接收成功");
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. };
  38. channel.basicConsume(QUEUE_NAME, true, consumer);
  39. }
  40. /**
  41. * 反序列化
  42. */
  43. private static Object dser(byte[] src) throws Exception {
  44. // 从字节数组读取数据
  45. ByteArrayInputStream bis = new ByteArrayInputStream(src);
  46. // 把字节数组反序列化成对象
  47. ObjectInputStream ois = new ObjectInputStream(bis);
  48. return ois.readObject();
  49. }
  50. }

先启动消费者,然后再启动生产者,其输出结果如下:

第二种:工作队列模式

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/420372
推荐阅读
相关标签
  

闽ICP备14008679号