当前位置:   article > 正文

RabbitMQ_rabbitmq创建虚拟主机

rabbitmq创建虚拟主机

一、RabbitMQ介绍

RabbitMQ 是一个开源的、基于 AMQP 协议的消息代理服务器。它可以用作消息系统,用于在分布式系统中存储和转发消息。RabbitMQ 最初由 VMware 的子公司 Rabbit Technologies 开发,现在的新名字是 Pivotal Software。

二、RabbitMQ 的基础架构

Server(服务器):RabbitMQ 服务器是核心组件,它负责处理客户端的连接、处理消息队列和路由。
Client(客户端):客户端负责与 RabbitMQ 服务器通信,发送和接收消息。客户端可以与服务器在同一台机器上,也可以部署在不同的机器上。
Message(消息):消息是 RabbitMQ 中的基本单位,它是由 sender(发送者)发送给 receiver(接收者)的数据。
Virtual Host(虚拟主机):虚拟主机是 RabbitMQ 中的一个概念,它可以用来隔离不同的交换机、队列和绑定。一个虚拟主机可以有自己独立的权限和配置。
Exchange(交换机):交换机是 RabbitMQ 中的核心组件,它负责根据消息的 routing key 将消息发送到对应的队列。
Queue(队列):队列是 RabbitMQ 的另一个核心组件,它负责存储消息直到接收者来取走它们。
Routing Key(路由键):路由键是用于指定消息应该发送到哪个队列的。RabbitMQ 使用它来决定如何路由一个消息。
Binding(绑定):绑定是将交换机与队列连接起来的组件,它指定交换机如何将消息发送到队列。
Connection(连接):连接是客户端与 RabbitMQ 服务器之间的通信路径。
Channel(通道):通道是连接中的一个概念,它允许客户端在连接上创建多个通道,以实现并发处理。

三、主要特点

可靠性:RabbitMQ 采用了持久化机制,确保了消息的可靠传输。
扩展性:RabbitMQ 支持水平扩展,可以通过添加更多的节点来增加整体性能。
灵活性:RabbitMQ 支持多种消息传输模式,如直接交换、绑定交换、主题交换等。
多平台:RabbitMQ 支持多种编程语言和平台,方便了开发人员的集成。
插件系统:RabbitMQ 有一个插件系统,可以方便地扩展其功能。

四、工作流程

生产者 将消息发送到 交换机。
交换机 根据 路由键 将消息发送到 队列。
消费者 从 队列 中获取消息

五、RabbitMQ的安装和配置

1. 安装依赖环境

RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

Linux下载并安装rabbitmq-server-3.6.5-1.noarch.rpm_linux rabbitmq 3.6下载-CSDN博客yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xzLinux下载并安装rabbitmq-server-3.6.5-1.noarch.rpm_linux rabbitmq 3.6下载-CSDN博客

2. 下载解压缩

把下载的压缩包解压

拉入Linux

3. 安装Erlang

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

4. 安装RabbitMQ

#安装依赖的包

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

#安装rabbitmq

rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

六、启动RabbitMQ

1. 启动命令

systemctl start rabbitmq-server # 启动服务

systemctl stop rabbitmq-server # 停止服务

systemctl restart rabbitmq-server # 重启服务

systemctl status rabbitmq-server #查看状态

2.开启管理界面及配置

rabbitmq-plugins enable rabbitmq_management

3. 修改默认配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app

loopback_users 中的 <<"guest">>,只保留guest

修改之后重启一下rabbitmq

4.浏览器访问

http://192.168.44.64:15672/

七、RabbitMQ的使用

1. 创建虚拟主机

前面添名字必须带/

修改

2. 创建用户

3.添加用户访问的虚拟主机

八、RabbitMQ入门

1. 结构和依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

2. 工具类

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class UntilChannel {
  7. /**
  8. * 获取 RabbitMQ 的 Channel
  9. *
  10. * @return Channel
  11. */
  12. public Channel getChannel() {
  13. // 创建连接工厂
  14. ConnectionFactory connectionFactory = new ConnectionFactory();
  15. // 设置 RabbitMQ 服务器地址
  16. connectionFactory.setHost("192.168.44.64");
  17. // 设置 RabbitMQ 服务器端口号
  18. connectionFactory.setPort(5672);
  19. // 设置 RabbitMQ 登录用户名
  20. connectionFactory.setUsername("user");
  21. // 设置 RabbitMQ 登录密码
  22. connectionFactory.setPassword("123456");
  23. // 设置 RabbitMQ 虚拟主机
  24. connectionFactory.setVirtualHost("/admin");
  25. Channel channel = null;
  26. try {
  27. // 创建连接
  28. Connection connection = connectionFactory.newConnection();
  29. // 创建 Channel
  30. channel = connection.createChannel();
  31. } catch (IOException e) {
  32. // 抛出运行时异常
  33. throw new RuntimeException(e);
  34. } catch (TimeoutException e) {
  35. // 抛出运行时异常
  36. throw new RuntimeException(e);
  37. }
  38. // 返回 Channel
  39. return channel;
  40. }
  41. }

3. Work queues工作队列模式

 (1) 生产者

  1. import com.aaa.until.UntilChannel;
  2. import com.rabbitmq.client.Channel;
  3. public class Test {
  4. public static void main(String[] args) throws Exception {
  5. // 创建 UntilChannel 对象
  6. UntilChannel untilChannel = new UntilChannel();
  7. // 获取 Channel 对象
  8. Channel channel = untilChannel.getChannel();
  9. // 声明队列
  10. channel.queueDeclare("testTxt", true, false, false, null);
  11. // 发布消息到队列
  12. for (int i = 0; i < 5; i++) {
  13. String body = i + " testTxt";
  14. channel.basicPublish("", "testTxt", null, body.getBytes());
  15. }
  16. }
  17. }

运行结果:

 

(2) 消费者1

  1. public class Test {
  2. public static void main(String[] args) throws Exception {
  3. // 创建 UntilChannel 实例
  4. UntilChannel untilChannel = new UntilChannel();
  5. // 获取 Channel 对象
  6. Channel channel = untilChannel.getChannel();
  7. // 创建 DefaultConsumer,并重写 handleDelivery 方法
  8. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  9. @Override
  10. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  11. // 将字节数组转换为字符串
  12. String string = new String(body);
  13. // 打印字符串
  14. System.out.println("----------" + string);
  15. }
  16. };
  17. // 开始从 "testTxt" 队列消费消息,启用自动确认机制
  18. channel.basicConsume("testTxt", true, defaultConsumer);
  19. }
  20. }

(3) 消费者2

  1. import java.io.IOException;
  2. public class Test {
  3. public static void main(String[] args) throws Exception {
  4. // 创建 UntilChannel 实例
  5. UntilChannel untilChannel = new UntilChannel();
  6. // 获取 Channel 对象
  7. Channel channel = untilChannel.getChannel();
  8. // 创建 DefaultConsumer,并重写 handleDelivery 方法
  9. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. // 将字节数组转换为字符串
  13. String string = new String(body);
  14. // 打印字符串
  15. System.out.println("----------" + string);
  16. }
  17. };
  18. // 开始从 "testTxt" 队列消费消息,启用自动确认机制
  19. channel.basicConsume("testTxt", true, defaultConsumer);
  20. }
  21. }

4. 订阅模式类型

(1) 生产者

  1. import com.aaa.until.UntilChannel;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. public class Test2 {
  5. public static void main(String[] args) throws Exception{
  6. UntilChannel untilChannel = new UntilChannel();
  7. // 从UntilChannel获取Channel对象
  8. Channel channel = untilChannel.getChannel();
  9. // 声明交换机名称为"switchboard",类型为FANOUT
  10. channel.exchangeDeclare("switchboard", BuiltinExchangeType.FANOUT,false);
  11. // 声明队列名称为"test1"和"test2"
  12. channel.queueDeclare("test1",false,false,false,null);
  13. channel.queueDeclare("test2",false,false,false,null);
  14. // 将队列绑定到交换机"switchboard"
  15. channel.queueBind("test1","switchboard","");
  16. channel.queueBind("test2","switchboard","");
  17. // 向交换机"switchboard"发布一条消息,路由键为空字符串
  18. channel.basicPublish("switchboard","",null,"ttttt".getBytes());
  19. }
  20. }

(2)消费者一

  1. import java.io.IOException;
  2. public class Test {
  3. public static void main(String[] args) throws Exception {
  4. // 创建 UntilChannel 实例
  5. UntilChannel untilChannel = new UntilChannel();
  6. // 获取 Channel 对象
  7. Channel channel = untilChannel.getChannel();
  8. // 创建 DefaultConsumer,并重写 handleDelivery 方法
  9. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. // 将字节数组转换为字符串
  13. String string = new String(body);
  14. // 打印字符串
  15. System.out.println("----------" + string);
  16. }
  17. };
  18. // 开始从 "test1" 队列消费消息,启用自动确认机制
  19. channel.basicConsume("test1", true, defaultConsumer);
  20. }
  21. }

(3) 消费者二

  1. import com.aaa.until.UntilChannel;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DefaultConsumer;
  5. import com.rabbitmq.client.Envelope;
  6. import java.io.IOException;
  7. public class Test {
  8. public static void main(String[] args) throws Exception {
  9. // 创建 UntilChannel 实例
  10. UntilChannel untilChannel = new UntilChannel();
  11. // 获取 Channel 对象
  12. Channel channel = untilChannel.getChannel();
  13. // 创建 DefaultConsumer,并重写 handleDelivery 方法
  14. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  15. @Override
  16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  17. // 将字节数组转换为字符串
  18. String string = new String(body);
  19. // 打印字符串
  20. System.out.println("----------" + string);
  21. }
  22. };
  23. // 开始从 "test2" 队列消费消息,启用自动确认机制
  24. channel.basicConsume("test2", true, defaultConsumer);
  25. }
  26. }

结果:

5.路由模式

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

(1) 在rabbit的页面上操作

添加一个交换机

添加两个队列

 

 

 用交换机发送信息

 

根据选择的Routing key来发布到哪个队列上

 (2) 用代码操作

工具类

  1. public class UntilChannel {
  2. /**
  3. * 获取 RabbitMQ 的 Channel
  4. *
  5. * @return Channel
  6. */
  7. public Channel getChannel() {
  8. // 创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. // 设置 RabbitMQ 服务器地址
  11. connectionFactory.setHost("192.168.44.64");
  12. // 设置 RabbitMQ 服务器端口号
  13. connectionFactory.setPort(5672);
  14. // 设置 RabbitMQ 登录用户名
  15. connectionFactory.setUsername("user");
  16. // 设置 RabbitMQ 登录密码
  17. connectionFactory.setPassword("123456");
  18. // 设置 RabbitMQ 虚拟主机
  19. connectionFactory.setVirtualHost("/admin");
  20. Channel channel = null;
  21. try {
  22. // 创建连接
  23. Connection connection = connectionFactory.newConnection();
  24. // 创建 Channel
  25. channel = connection.createChannel();
  26. } catch (IOException e) {
  27. // 抛出运行时异常
  28. throw new RuntimeException(e);
  29. } catch (TimeoutException e) {
  30. // 抛出运行时异常
  31. throw new RuntimeException(e);
  32. }
  33. // 返回 Channel
  34. return channel;
  35. }
  36. }

测试类

 

  1. public class Test2 {
  2. public static void main(String[] args) throws Exception{
  3. UntilChannel untilChannel = new UntilChannel();
  4. // 从UntilChannel获取Channel对象
  5. Channel channel = untilChannel.getChannel();
  6. // 声明交换机名称为"exchange_direct_test",DIRECT
  7. channel.exchangeDeclare("exchange_direct_test1", BuiltinExchangeType.DIRECT,false);
  8. // 声明队列名称为"queue1"和"queue2"
  9. channel.queueDeclare("queue1",false,false,false,null);
  10. channel.queueDeclare("queue2",false,false,false,null);
  11. /**
  12. 参数说明:
  13. - var1 :队列名称,类型为 String 。
  14. - var2 :是否持久化队列,类型为 boolean 。如果设置为 true ,则在 RabbitMQ 服务器重启后,队列仍然存在;如果设置为 false ,则在服务器重启后,队列将被删除。
  15. - var3 :是否独占队列,类型为 boolean 。如果设置为 true ,则只允许当前连接使用该队列;如果设置为 false ,则允许多个连接使用该队列。
  16. - var4 :是否自动删除队列,类型为 boolean 。如果设置为 true ,则当最后一个消费者断开连接后,队列将被删除;如果设置为 false ,则即使没有消费者连接,队列也不会被删除。
  17. - var5 :其他参数,类型为 Map<String, Object> 。可以设置一些额外的参数,例如队列的消息过期时间、最大长度等。
  18. */
  19. // 将队列绑定到交换机"exchange_direct_test"
  20. channel.queueBind("queue1","exchange_direct_test","test1");
  21. channel.queueBind("queue2","exchange_direct_test","test2");
  22. channel.queueBind("queue2","exchange_direct_test","test2.test1");
  23. // 向交换机"exchange_direct_test"发布一条消息,路由键为空字符串
  24. channel.basicPublish("exchange_direct_test","test1",null,"一条测试信息".getBytes());
  25. /*
  26. * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) throws IOException;
  27. 参数说明:
  28. - var1 :交换机名称,类型为 String 。指定要发布消息的交换机。
  29. - var2 :路由键,类型为 String 。指定将消息发送到哪个队列,可以为空字符串。
  30. - var3 :消息的基本属性,类型为 AMQP.BasicProperties 。可以设置消息的持久化、优先级、过期时间等属性。
  31. - var4 :消息的内容,类型为 byte[] 。以字节数组的形式传递消息的实际内容。
  32. * */
  33. }
  34. }

6.Topics通配符模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: test.topic

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

(1) 在rabbit的页面上操作

添加一个交换机

添加两个队列

分别是

queue3  queue4

修改queue3

修改queue4

推送信息

 

 

(2) 用代码操作

 

  1. public class Test2 {
  2. public static void main(String[] args) throws Exception{
  3. UntilChannel untilChannel = new UntilChannel();
  4. // 从UntilChannel获取Channel对象
  5. Channel channel = untilChannel.getChannel();
  6. // 声明交换机名称为"exchange_topic_test",TOPIC
  7. channel.exchangeDeclare("exchange_topic_test", BuiltinExchangeType.TOPIC,false);
  8. // 声明队列名称为"queue3"和"queue4"
  9. channel.queueDeclare("queue3",false,false,false,null);
  10. channel.queueDeclare("queue4",false,false,false,null);
  11. /**
  12. 参数说明:
  13. - var1 :队列名称,类型为 String 。
  14. - var2 :是否持久化队列,类型为 boolean 。如果设置为 true ,则在 RabbitMQ 服务器重启后,队列仍然存在;如果设置为 false ,则在服务器重启后,队列将被删除。
  15. - var3 :是否独占队列,类型为 boolean 。如果设置为 true ,则只允许当前连接使用该队列;如果设置为 false ,则允许多个连接使用该队列。
  16. - var4 :是否自动删除队列,类型为 boolean 。如果设置为 true ,则当最后一个消费者断开连接后,队列将被删除;如果设置为 false ,则即使没有消费者连接,队列也不会被删除。
  17. - var5 :其他参数,类型为 Map<String, Object> 。可以设置一些额外的参数,例如队列的消息过期时间、最大长度等。
  18. */
  19. // 将队列绑定到交换机"exchange_topic_test"
  20. channel.queueBind("queue3","exchange_topic_test","test.#");
  21. channel.queueBind("queue4","exchange_topic_test","test.*");
  22. // 向交换机"exchange_topic_test"发布一条消息,路由键为空字符串
  23. channel.basicPublish("exchange_topic_test","test.t1.t1",null,"一条测试信息".getBytes());
  24. /*
  25. * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) throws IOException;
  26. 参数说明:
  27. - var1 :交换机名称,类型为 String 。指定要发布消息的交换机。
  28. - var2 :路由键,类型为 String 。指定将消息发送到哪个队列,可以为空字符串。
  29. - var3 :消息的基本属性,类型为 AMQP.BasicProperties 。可以设置消息的持久化、优先级、过期时间等属性。
  30. - var4 :消息的内容,类型为 byte[] 。以字节数组的形式传递消息的实际内容。
  31. * */
  32. }
  33. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/845661
推荐阅读
相关标签
  

闽ICP备14008679号