赞
踩
RabbitMQ 是一个开源的、基于 AMQP 协议的消息代理服务器。它可以用作消息系统,用于在分布式系统中存储和转发消息。RabbitMQ 最初由 VMware 的子公司 Rabbit Technologies 开发,现在的新名字是 Pivotal Software。
Server(服务器):RabbitMQ 服务器是核心组件,它负责处理客户端的连接、处理消息队列和路由。
Client(客户端):客户端负责与 RabbitMQ 服务器通信,发送和接收消息。客户端可以与服务器在同一台机器上,也可以部署在不同的机器上。
Message(消息):消息是 RabbitMQ 中的基本单位,它是由 sender(发送者)发送给 receiver(接收者)的数据。
Virtual Host(虚拟主机):虚拟主机是 RabbitMQ 中的一个概念,它可以用来隔离不同的交换机、队列和绑定。一个虚拟主机可以有自己独立的权限和配置。
Exchange(交换机):交换机是 RabbitMQ 中的核心组件,它负责根据消息的 routing key 将消息发送到对应的队列。
Queue(队列):队列是 RabbitMQ 的另一个核心组件,它负责存储消息直到接收者来取走它们。
Routing Key(路由键):路由键是用于指定消息应该发送到哪个队列的。RabbitMQ 使用它来决定如何路由一个消息。
Binding(绑定):绑定是将交换机与队列连接起来的组件,它指定交换机如何将消息发送到队列。
Connection(连接):连接是客户端与 RabbitMQ 服务器之间的通信路径。
Channel(通道):通道是连接中的一个概念,它允许客户端在连接上创建多个通道,以实现并发处理。
可靠性:RabbitMQ 采用了持久化机制,确保了消息的可靠传输。
扩展性:RabbitMQ 支持水平扩展,可以通过添加更多的节点来增加整体性能。
灵活性:RabbitMQ 支持多种消息传输模式,如直接交换、绑定交换、主题交换等。
多平台:RabbitMQ 支持多种编程语言和平台,方便了开发人员的集成。
插件系统:RabbitMQ 有一个插件系统,可以方便地扩展其功能。
生产者 将消息发送到 交换机。
交换机 根据 路由键 将消息发送到 队列。
消费者 从 队列 中获取消息
RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
Linux下载并安装rabbitmq-server-3.6.5-1.noarch.rpm_linux rabbitmq 3.6下载-CSDN博客yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xzLinux下载并安装rabbitmq-server-3.6.5-1.noarch.rpm_linux rabbitmq 3.6下载-CSDN博客
把下载的压缩包解压
拉入Linux
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
#安装依赖的包
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
#安装rabbitmq
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
systemctl start rabbitmq-server # 启动服务
systemctl stop rabbitmq-server # 停止服务
systemctl restart rabbitmq-server # 重启服务
systemctl status rabbitmq-server #查看状态
rabbitmq-plugins enable rabbitmq_management
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app
loopback_users 中的 <<"guest">>,只保留guest
修改之后重启一下rabbitmq
前面添名字必须带/
修改
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class UntilChannel {
- /**
- * 获取 RabbitMQ 的 Channel
- *
- * @return Channel
- */
- public Channel getChannel() {
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器地址
- connectionFactory.setHost("192.168.44.64");
- // 设置 RabbitMQ 服务器端口号
- connectionFactory.setPort(5672);
- // 设置 RabbitMQ 登录用户名
- connectionFactory.setUsername("user");
- // 设置 RabbitMQ 登录密码
- connectionFactory.setPassword("123456");
- // 设置 RabbitMQ 虚拟主机
- connectionFactory.setVirtualHost("/admin");
- Channel channel = null;
- try {
- // 创建连接
- Connection connection = connectionFactory.newConnection();
- // 创建 Channel
- channel = connection.createChannel();
- } catch (IOException e) {
- // 抛出运行时异常
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- // 抛出运行时异常
- throw new RuntimeException(e);
- }
- // 返回 Channel
- return channel;
- }
- }

- import com.aaa.until.UntilChannel;
- import com.rabbitmq.client.Channel;
-
- public class Test {
- public static void main(String[] args) throws Exception {
- // 创建 UntilChannel 对象
- UntilChannel untilChannel = new UntilChannel();
- // 获取 Channel 对象
- Channel channel = untilChannel.getChannel();
-
- // 声明队列
- channel.queueDeclare("testTxt", true, false, false, null);
-
- // 发布消息到队列
- for (int i = 0; i < 5; i++) {
- String body = i + " testTxt";
- channel.basicPublish("", "testTxt", null, body.getBytes());
- }
- }
- }

运行结果:
- public class Test {
- public static void main(String[] args) throws Exception {
- // 创建 UntilChannel 实例
- UntilChannel untilChannel = new UntilChannel();
- // 获取 Channel 对象
- Channel channel = untilChannel.getChannel();
- // 创建 DefaultConsumer,并重写 handleDelivery 方法
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 将字节数组转换为字符串
- String string = new String(body);
- // 打印字符串
- System.out.println("----------" + string);
- }
- };
- // 开始从 "testTxt" 队列消费消息,启用自动确认机制
- channel.basicConsume("testTxt", true, defaultConsumer);
- }
- }

- import java.io.IOException;
-
- public class Test {
- public static void main(String[] args) throws Exception {
- // 创建 UntilChannel 实例
- UntilChannel untilChannel = new UntilChannel();
-
- // 获取 Channel 对象
- Channel channel = untilChannel.getChannel();
-
- // 创建 DefaultConsumer,并重写 handleDelivery 方法
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 将字节数组转换为字符串
- String string = new String(body);
-
- // 打印字符串
- System.out.println("----------" + string);
- }
- };
- // 开始从 "testTxt" 队列消费消息,启用自动确认机制
- channel.basicConsume("testTxt", true, defaultConsumer);
- }
- }

- import com.aaa.until.UntilChannel;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
-
- public class Test2 {
- public static void main(String[] args) throws Exception{
- UntilChannel untilChannel = new UntilChannel();
-
- // 从UntilChannel获取Channel对象
- Channel channel = untilChannel.getChannel();
-
- // 声明交换机名称为"switchboard",类型为FANOUT
- channel.exchangeDeclare("switchboard", BuiltinExchangeType.FANOUT,false);
-
- // 声明队列名称为"test1"和"test2"
- channel.queueDeclare("test1",false,false,false,null);
- channel.queueDeclare("test2",false,false,false,null);
-
- // 将队列绑定到交换机"switchboard"
- channel.queueBind("test1","switchboard","");
- channel.queueBind("test2","switchboard","");
-
- // 向交换机"switchboard"发布一条消息,路由键为空字符串
- channel.basicPublish("switchboard","",null,"ttttt".getBytes());
- }
- }

- import java.io.IOException;
-
- public class Test {
- public static void main(String[] args) throws Exception {
- // 创建 UntilChannel 实例
- UntilChannel untilChannel = new UntilChannel();
- // 获取 Channel 对象
- Channel channel = untilChannel.getChannel();
- // 创建 DefaultConsumer,并重写 handleDelivery 方法
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 将字节数组转换为字符串
- String string = new String(body);
- // 打印字符串
- System.out.println("----------" + string);
- }
- };
- // 开始从 "test1" 队列消费消息,启用自动确认机制
- channel.basicConsume("test1", true, defaultConsumer);
- }
- }

- import com.aaa.until.UntilChannel;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
-
-
- import java.io.IOException;
-
- public class Test {
- public static void main(String[] args) throws Exception {
- // 创建 UntilChannel 实例
- UntilChannel untilChannel = new UntilChannel();
-
- // 获取 Channel 对象
- Channel channel = untilChannel.getChannel();
-
- // 创建 DefaultConsumer,并重写 handleDelivery 方法
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 将字节数组转换为字符串
- String string = new String(body);
-
- // 打印字符串
- System.out.println("----------" + string);
- }
- };
- // 开始从 "test2" 队列消费消息,启用自动确认机制
- channel.basicConsume("test2", true, defaultConsumer);
- }
- }

结果:
路由模式特点:
RoutingKey
(路由key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息添加一个交换机
添加两个队列
用交换机发送信息
根据选择的Routing key来发布到哪个队列上
工具类
- public class UntilChannel {
- /**
- * 获取 RabbitMQ 的 Channel
- *
- * @return Channel
- */
- public Channel getChannel() {
- // 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器地址
- connectionFactory.setHost("192.168.44.64");
- // 设置 RabbitMQ 服务器端口号
- connectionFactory.setPort(5672);
- // 设置 RabbitMQ 登录用户名
- connectionFactory.setUsername("user");
- // 设置 RabbitMQ 登录密码
- connectionFactory.setPassword("123456");
- // 设置 RabbitMQ 虚拟主机
- connectionFactory.setVirtualHost("/admin");
- Channel channel = null;
- try {
- // 创建连接
- Connection connection = connectionFactory.newConnection();
- // 创建 Channel
- channel = connection.createChannel();
- } catch (IOException e) {
- // 抛出运行时异常
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- // 抛出运行时异常
- throw new RuntimeException(e);
- }
- // 返回 Channel
- return channel;
- }
- }

测试类
- public class Test2 {
- public static void main(String[] args) throws Exception{
- UntilChannel untilChannel = new UntilChannel();
- // 从UntilChannel获取Channel对象
- Channel channel = untilChannel.getChannel();
- // 声明交换机名称为"exchange_direct_test",DIRECT
- channel.exchangeDeclare("exchange_direct_test1", BuiltinExchangeType.DIRECT,false);
- // 声明队列名称为"queue1"和"queue2"
- channel.queueDeclare("queue1",false,false,false,null);
- channel.queueDeclare("queue2",false,false,false,null);
- /**
- 参数说明:
- - var1 :队列名称,类型为 String 。
- - var2 :是否持久化队列,类型为 boolean 。如果设置为 true ,则在 RabbitMQ 服务器重启后,队列仍然存在;如果设置为 false ,则在服务器重启后,队列将被删除。
- - var3 :是否独占队列,类型为 boolean 。如果设置为 true ,则只允许当前连接使用该队列;如果设置为 false ,则允许多个连接使用该队列。
- - var4 :是否自动删除队列,类型为 boolean 。如果设置为 true ,则当最后一个消费者断开连接后,队列将被删除;如果设置为 false ,则即使没有消费者连接,队列也不会被删除。
- - var5 :其他参数,类型为 Map<String, Object> 。可以设置一些额外的参数,例如队列的消息过期时间、最大长度等。
- */
- // 将队列绑定到交换机"exchange_direct_test"
- channel.queueBind("queue1","exchange_direct_test","test1");
- channel.queueBind("queue2","exchange_direct_test","test2");
- channel.queueBind("queue2","exchange_direct_test","test2.test1");
- // 向交换机"exchange_direct_test"发布一条消息,路由键为空字符串
- channel.basicPublish("exchange_direct_test","test1",null,"一条测试信息".getBytes());
- /*
- * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) throws IOException;
- 参数说明:
- - var1 :交换机名称,类型为 String 。指定要发布消息的交换机。
- - var2 :路由键,类型为 String 。指定将消息发送到哪个队列,可以为空字符串。
- - var3 :消息的基本属性,类型为 AMQP.BasicProperties 。可以设置消息的持久化、优先级、过期时间等属性。
- - var4 :消息的内容,类型为 byte[] 。以字节数组的形式传递消息的实际内容。
- * */
- }
- }

Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符
!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: test.topic
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词 test.* test.insert
(1) 在rabbit的页面上操作
添加一个交换机
添加两个队列
分别是
queue3 queue4
修改queue3
修改queue4
推送信息
(2) 用代码操作
- public class Test2 {
- public static void main(String[] args) throws Exception{
- UntilChannel untilChannel = new UntilChannel();
- // 从UntilChannel获取Channel对象
- Channel channel = untilChannel.getChannel();
- // 声明交换机名称为"exchange_topic_test",TOPIC
- channel.exchangeDeclare("exchange_topic_test", BuiltinExchangeType.TOPIC,false);
- // 声明队列名称为"queue3"和"queue4"
- channel.queueDeclare("queue3",false,false,false,null);
- channel.queueDeclare("queue4",false,false,false,null);
- /**
- 参数说明:
- - var1 :队列名称,类型为 String 。
- - var2 :是否持久化队列,类型为 boolean 。如果设置为 true ,则在 RabbitMQ 服务器重启后,队列仍然存在;如果设置为 false ,则在服务器重启后,队列将被删除。
- - var3 :是否独占队列,类型为 boolean 。如果设置为 true ,则只允许当前连接使用该队列;如果设置为 false ,则允许多个连接使用该队列。
- - var4 :是否自动删除队列,类型为 boolean 。如果设置为 true ,则当最后一个消费者断开连接后,队列将被删除;如果设置为 false ,则即使没有消费者连接,队列也不会被删除。
- - var5 :其他参数,类型为 Map<String, Object> 。可以设置一些额外的参数,例如队列的消息过期时间、最大长度等。
- */
- // 将队列绑定到交换机"exchange_topic_test"
- channel.queueBind("queue3","exchange_topic_test","test.#");
- channel.queueBind("queue4","exchange_topic_test","test.*");
- // 向交换机"exchange_topic_test"发布一条消息,路由键为空字符串
- channel.basicPublish("exchange_topic_test","test.t1.t1",null,"一条测试信息".getBytes());
- /*
- * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) throws IOException;
- 参数说明:
- - var1 :交换机名称,类型为 String 。指定要发布消息的交换机。
- - var2 :路由键,类型为 String 。指定将消息发送到哪个队列,可以为空字符串。
- - var3 :消息的基本属性,类型为 AMQP.BasicProperties 。可以设置消息的持久化、优先级、过期时间等属性。
- - var4 :消息的内容,类型为 byte[] 。以字节数组的形式传递消息的实际内容。
- * */
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。