赞
踩
官网:https://www.rabbitmq.com/
什么是RabbitMQ,官方给出来这样的解释:
RabbitMQ is the most widely deployed open source message broker.
With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. From T-Mobile to Runtastic, RabbitMQ is used worldwide at small startups and large enterprises.
RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
RabbitMQ runs on many operating systems and cloud environments, and provides a wide range of developer tools for most popular languages.
翻译以后:
RabbitMQ是部署最广泛的开源消息代理。
RabbitMQ拥有成千上万的用户,是最受欢迎的开源消息代理之一。从T-Mobile 到Runtastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。
RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具
简单概述:
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。
1:下载地址:https://www.rabbitmq.com/download.html
2:环境准备:CentOS7.x+ / Erlang
RabbitMQ是采用Erlang语言开发的,所以系统环境必须提供Erlang环境,第一步就是安装Erlang。
erlang和RabbitMQ版本的按照比较: https://www.rabbitmq.com/which-erlang.html
查看系统版本号
[root@iZm5eauu5f1ulwtdgwqnsbZ ~]# lsb_release -a
LSB Version: :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux
release 8.3.2011
Release: 8.3.2011
Codename: n/a
参考地址:https://www.erlang-solutions.com/downloads/
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v
yum install -y socat
下载地址:https://www.rabbitmq.com/download.html
点击上诉Centos8.x后 会给你下载一个安装包 自己上传到服务器 然后用下面命令进行下载
rpm -Uvh rabbitmq-server-3.9.13-1.el8.noarch.rpm(安装包名)
# 启动服务
> systemctl start rabbitmq-server
# 查看服务状态
> systemctl status rabbitmq-server
# 停止服务
> systemctl stop rabbitmq-server
# 开机启动服务
> systemctl enable rabbitmq-server
自己查看一个elang和rabbitmq版本有没有对应
RabbitMQ默认情况下有一个配置文件,定义了RabbitMQ的相关配置信息,默认情况下能够满足日常的开发需求。如果需要修改需要,需要自己创建一个配置文件进行覆盖。
参考官网:
1:https://www.rabbitmq.com/documentation.html
2:https://www.rabbitmq.com/configure.html
3:https://www.rabbitmq.com/configure.html#config-items
4:https://github.com/rabbitmq/rabbitmq-server/blob/add-debug-messages-to-quorum_queue_SUITE/docs/rabbitmq.conf.example
5672:RabbitMQ的通讯端口
25672:RabbitMQ的节点间的CLI通讯端口是
15672:RabbitMQ HTTP_API的端口,管理员用户才能访问,用于管理RabbitMQ,需要启动Management插件。
1883,8883:MQTT插件启动时的端口。
61613、61614:STOMP客户端插件启用的时候的端口。
15674、15675:基于webscoket的STOMP端口和MOTT端口
一定要注意:RabbitMQ 在安装完毕以后,会绑定一些端口,如果你购买的是阿里云或者腾讯云相关的服务器一定要在安全组中把对应的端口添加到防火墙。
rabbitmq-plugins enable rabbitmq_management
说明:rabbitmq有一个默认账号和密码是:
guest
默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。
systemctl restart rabbitmq-server
一定要记住,在对应服务器(阿里云,腾讯云等)的安全组中开放
15672
的端口。
ip:端口
但无法用guest访问到 所以需要下面进行授权用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
用户级别:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Newpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
docker pull rabbitmq:management
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
—hostname:指定容器主机名称
—name:指定容器名称
-p:将mq端口号映射到本地
或者运行时设置用户和密码
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
查看日志
docker logs -f myrabbit
使用 http://你的IP地址:15672
访问rabbit控制台
> more xxx.log 查看日记信息
> netstat -naop | grep 5672 查看端口是否被占用
> ps -ef | grep 5672 查看进程
> systemctl stop 服务
1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency><dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
上面根据自己的项目环境进行选择即可。
番外:rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因。
systemctl start rabbitmq-server或者docker start myrabbit
package com.xuexiangban.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 // http://120.79.148.248:15672/#/queues connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("提供者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ String queue = "queue1"; channel.queueDeclare(queue, false, false, false, null); // 6: 准备发送消息的内容 String message = "你好,一起学习吧!!!"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", queue, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
connection error; protocol method: #method<connection.close>(reply-code=530,
是因为自己的admin角色没有授权
(1)点击角色
(2)设置权限 (需要点击Set permission)
在执行
package com.xuexiangban.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("提供者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); channel.basicConsume("queue1", true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println("收到的消息是: "+new String(delivery.getBody(),"UTF-8")); } }, new CancelCallback() { public void handle(String consumerTag) throws IOException { System.out.println("接受失败了"); } }); System.out.println("接收消息成功!"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("收到消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
参考官网:https://www.rabbitmq.com/getstarted.html
参考官网:https://www.rabbitmq.com/getstarted.html
在使用之前 需要在客户端中自行创建exchange
注意 原先在上诉例子上已经创建了queue1 如果队列被你删除了 请自己在自行创建 或者代码里声明
package com.xuexiangban.rabbitmq.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 String message = "你好,学相伴!!!"; String exchangeName = "fanout-exchange"; String routingKey = ""; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
package com.xuexiangban.rabbitmq.fanout; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Consumer { private static Runnable runnable = () -> { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println(queueName + ":开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue1").start(); // new Thread(runnable, "queue2").start(); // new Thread(runnable, "queue3").start(); } }
参考官网:https://www.rabbitmq.com/getstarted.html
生产者通过交换机绑定的路由
动态的发布不同的信息 消费者通过自己绑定的路由可以接收不一样的信息
需要先自行创建direct-exchange 然后创建俩个队列 之后在交换机中绑定对应的路由
生产者
package com.xuexiangban.rabbitmq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 如果没有创建queue1的话,就不能注释掉这段代码 // String queue = "queue1"; // channel.queueDeclare(queue, false, false, true, null); // 6: 准备发送消息的内容 String message1 = "你好,学相伴!!!direct-exchange"; String message2 = "你好,学相伴!!!direct-exchanges"; String exchangeName = "direct-exchange"; /// direct 模式 比fanout多了个路由而已// String routingKey1 = "ssm"; String routingKey2 = "weixin"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish(exchangeName, routingKey1, null, message1.getBytes()); channel.basicPublish(exchangeName, routingKey2, null, message2.getBytes()); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
package com.xuexiangban.rabbitmq.routing; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Consumer { private static Runnable runnable = new Runnable() { public void run() { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 // channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { public void handle(String s) throws IOException { } }); System.out.println(queueName + ":开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("接收消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); } }
参考官网:https://www.rabbitmq.com/getstarted.html
先创建top-exchange交换机 之后在交换机绑定对应队列的路由
queue1 com.#
ueue2 *.course.#
queue3 (*.*.user)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hZvwQ7WG-1648891350790)(C:\Users\HONOR\AppData\Roaming\Typora\typora-user-images\image-20220401103729973.png)]
package com.xuexiangban.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 String message = "你好,学相伴!!!"; String exchangeName = "top-exchange"; String routingKey1 = "com.course.order";//都可以收到 queue-1 queue-2 String routingKey2 = "com.order.user";//都可以收到 queue-1 queue-3 // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish(exchangeName, routingKey1, null, message.getBytes()); channel.basicPublish(exchangeName, routingKey2, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
package com.xuexiangban.rabbitmq.topic; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Consumer { private static Runnable runnable = () -> { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { public void handle(String s) throws IOException { } }); System.out.println(queueName + ":开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("接收消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); new Thread(runnable, "queue3").start(); } }
参考官网:https://www.rabbitmq.com/getstarted.html
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
生产者
package com.xuexiangban.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 //===============================end topic模式================================== for (int i = 1; i <= 20; i++) { //消息的内容 String msg = "学相伴:" + i; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, msg.getBytes()); } System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者1
package com.xuexiangban.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Work1 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work1"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 // channel.queueDeclare("queue1", false, false, false, null); // 同一时刻,服务器只会推送一条消息给消费者 // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work1-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者2
package com.xuexiangban.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Work2 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work2"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, true, false, null); // 同一时刻,服务器只会推送一条消息给消费者 //channel.basicQos(1); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue2", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(200); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work2-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
work1和work2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。
参考官网:https://www.rabbitmq.com/getstarted.html
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
生产者同上诉例题一样
消费者3*
package com.xuexiangban.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Work3 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work3"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 // channel.queueDeclare("queue1", false, false, false, null); // 同一时刻,服务器只会推送一条消息给消费者 // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); //关闭手动应答 finalChannel.basicConsume("queue8", false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work3-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work3-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者4
package com.xuexiangban.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; /** * @author: 学相伴-飞哥 * @description: Consumer * @Date : 2021/3/2 */ public class Work4 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("120.79.148.248"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work4"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, true, false, null); // 同一时刻,服务器只会推送一条消息给消费者 //channel.basicQos(1); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue8", false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work4-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(200); finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work4-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
从结果可以看到,消费者1在相同时间内,处理了更多的消息;以上代码我们实现了公平分发模式;
公平分发需要消费者开启手动应答,关闭自动应答
(1)当队列里消息较多时,我们通常会开启多个消费者处理消息;公平分发和轮询分发都是我们经常使用的模式。
(2)轮询分发的主要思想是“按均分配”,不考虑消费者的处理能力,所有消费者均分;这种情况下,处理能力弱的服务器,一直都在处理消息,而处理能力强的服务器,在处理完消息后,处于空闲状态;
(3) 公平分发的主要思想是”能者多劳”,按需分配,能力强的干的多。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。