赞
踩
rabbitmq是基于AMQP(高级消息队列协议)的开源消息中间件,rabbitmq服务端使用Erlang语言开发。消息中间件对于构建复杂应用软件来说作用主要体现在:
解耦异构系统数据交互
高并发情况下起到削峰作用
保证数据顺序被处理
保证数据可靠被处理
较好的扩展性
rabbitmq目前在互联网应用的使用非常流行,主要原因如下:
支持多种协议:AMQP、STOMP、MQTT等
提供多语言客户端:java、python、JavaScript、PHP、c#等
数据可靠性:支持类似ACK确认机制保证数据正确被消费
高可用:可以将队列在集器中不同节点设置镜像的方式保证队列高可用
admin管理界面:提供一个用户交互后台管理页面操作查看队列、用户等信息
插件式扩展:通过插件的方式进行功能的扩展
前面我们说过rabbitmq服务端由erlang语言开发,所以想要运行rabbitmq服务端必须先安装erlang语言环境。下面我们通过源码编译的方式在linux上安装erlang,安装之前我们需要知道不同版本的rabbitmq对应的erlang版本是有要求的,下面先了解下rabbitmq和erlang版本的对应关系表:
Rabbitmq版本 | Erlang/OTP最低版本 | Erlang/OTP最高版本 |
3.8.2 3.8.1 3.8.0 | 21.3 | 22.x |
3.7.24 3.7.23 3.7.22 3.7.21 3.7.20 3.7.19 | 21.3 | 22.x |
3.7.18 3.7.17 3.7.16 3.7.15 | 20.3 | 22.x |
3.7.14 3.7.13 3.7.12 3.7.11 | 20.3 | 21.3.x |
3.7.10 3.7.9 3.7.8 3.7.7 | 19.3 | 21.x |
3.7.6 3.7.5 3.7.4 3.7.3 3.7.2 3.7.1 3.7.0 | 19.3 | 20.3.x |
下面我们选择当前较新的配套版本进行安装。
Rabbitmq版本 | Erlang/OTP版本 |
3.8.0 | 22.0 |
选择好版本之后,作为合格的工程师,接下来我们应该为linux操作系统规划下目录,笔者这里目录规划如下:
目录 | 说明 |
/opt | 自定义软件根目录 |
/opt/src | 放置源码 |
/opt/tarball | 放置tarball文件,例如xxx.tar.gz |
/opt/soft | 放置安装好的软件 |
/opt/logs | 放置日志文件 |
/opt/data | 放置数据文件 |
/opt/backup | 放置备份文件 |
规划好目录后将全部目录通过mkdir -p 命令创建好,并通过ll命令查看目录权限情况。
准备就绪后我们开始先安装erlang,具体步骤如下:
1、访问erlang官方下载地址:
https://www.erlang.org/downloads
2、进入后选择我们上面规划的22.0版本:
3、进入后这里我们选择源码文件:
4、右键复制链接地址,然后ssh登录linux系统进入/opt/tarball目录下,通过wget命令进行下载:
- cd /opt/tarball
- wget http://erlang.org/download/otp_src_22.0.tar.gz
5、从/opt/tarball目录解压erlang到/opt/src目录下:
tar -xvf otp_src_22.0.tar.gz -C /opt/src/
6、安装erlang外部依赖包:
- yum install -y gcc
- yum install -y perl
- yum install -y ncurses-devel
- yum install -y openssl openssl-devel
- yum -y install unixODBC-devel
7、通过编译源码安装erlang:
- cd /opt/src/otp_src_22.0
- ./configure --prefix=/opt/soft/erlang --without-javac
- make
- make install
8、配置环境变量(vi /etc/profile):
通过vi /etc/profile编辑profile文件,在文件后面新增下面内容:
- export ERLANG_HOME=/opt/soft/erlang
- export PATH=$PATH:$ERLANG_HOME/bin
执行source /etc/profile使配置生效。
9、验证安装情况(erl):
出现如上图证明安装成功!安装好erlang语言后接下来继续安装rabbitmq,具体步骤如下:
1、访问rabbtimq github realeas页面:
https://github.com/rabbitmq/rabbitmq-server/releases
2、进入后通过tag切换为3.8.0版本,然后拉到最后选择版本为3.8.0的二进制安装包:
3、进入/opt/tarball目录通过wget命令进行下载:
- cd /opt/tarball
- wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-generic-unix-3.8.0.tar.xz
4、从/opt/tarball目录解压rabbitmq二进制包到/opt/soft目录下:
tar -xvf rabbitmq-server-generic-unix-3.8.0.tar.xz -C /opt/soft/
5、配置环境变量:
通过vi /etc/profile编辑profile文件,将上面新增的内容重新修改为:
- export ERLANG_HOME=/opt/soft/erlang
- export RABBITMQ_HOME=/opt/soft/rabbitmq_server-3.8.0
- export PATH=$PATH:$ERLANG_HOME/bin:$RABBITMQ_HOME/sbin
执行source /etc/profile使配置生效。
6、启动RabbitMQ服务:
- rabbitmq-server
- 或者后台方式启动:
- rabbitmq-server -detached
出现如下图证明成功安装并启动rabbtimq服务:
7、查看rabbitmq服务运行状态:
rabbitmqctl status
8、停止服务:
rabbitmqctl stop
rabbitmqctl命令是rabbitmq当前节点服务端的命令行客户端,通过该命令可以获取服务端的信息以及和服务端进行交互。
上面我们安装好了rabbitmq服务,那么接下来我们就通过helloworld的方式来体验一把,这里我们使用java语言进行开发,通过maven构建工具管理项目依赖,通过IDEA编辑器进行代码编写,具体步骤如下:
1、通过IDEA创建一个普通maven项目,名称笔者这里为:lazy-course-rabbitmq:
2、访问github,找到rabbitmq-java-client项目的pom依赖坐标并添加到项目pom.xml:
- JDK8以上的可以使用下面版本坐标(笔者使用这个):
-
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.8.0</version>
- </dependency>
-
- JDK8以下的可以使用下面版本坐标:
-
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.11.3</version>
- </dependency>
3、分别创建发送者和接收者类,关键代码如下:
发送者Send.java :
- public class Send {
-
-
- public static void main(String[] args) throws Exception {
-
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.137.101");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- //public final static String HELLO_QUEUE_NAME = "hello";
- channel.queueDeclare(Const.HELLO_QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- channel.basicPublish("", Const.HELLO_QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
-
-
- }
- }
接收者Recv.java :
- public class Recv {
-
-
- public static void main(String[] argv) throws Exception {
-
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.137.101");
- factory.setPort(5672);
-
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
-
- //public final static String HELLO_QUEUE_NAME = "hello";
- //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- channel.queueDeclare(Const.HELLO_QUEUE_NAME, false, false, false, null);
-
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
-
- DeliverCallback deliverCallback = new DeliverCallback() {
- public void handle(String s, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- }
- };
-
-
- //basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
- channel.basicConsume(Const.HELLO_QUEUE_NAME, true, deliverCallback, new CancelCallback() {
- public void handle(String s) throws IOException {
- }
- });
- }
- }
4、分别运行发送者和接收者main方法,可以看到控制台输出如下:
发送者控制台输出 :
报错了,提示没有权限!
原因是rabbitmq服务默认启动后用户名和密码均为:guest,guest这个用户默认只能通过localhost的方式访问,不支持在别的服务器通过ip的方式远程去访问,当然可以通过配置允许guest远程访问,但是强烈不建议这么做!最佳的方式删除guset用户,然后创建一个新的账号,并赋予足够的服务器权限,创建方式如下:
- 列出当前用户列表:
rabbitmqctl list_users
- 删除guest用户:
rabbitmqctl delete_user guest
- 创建新用户(标签为空非管理员):
rabbitmqctl add_user lazy 111111
- 为lazy用户设置管理员标签:
rabbitmqctl set_user_tags lazy administrator
- 查看并创建虚拟主机(rabbitmq支持多租户模式,任何队列都必须附属在某个虚拟主机上,类似nginx的虚拟主机server{}的概念,安装后默认虚拟主机名称/,guest用户拥有/虚拟主机的权限):
- rabbitmqctl list_vhosts
- rabbitmqctl add_vhost vhost_hello
- 设置虚拟机最大连接数和队列数限制参数:
rabbitmqctl set_vhost_limits -p vhost_hello '{"max-connections": 200, "max-queues": 1024}'
- 查看lazy用户拥有的虚拟机权限
rabbitmqctl list_user_permissions lazy
- 给lazy用户添加使用vhost_hello虚拟机资源的权限:
rabbitmqctl set_permissions -p vhost_hello lazy "^hello.*" ".*" ".*"
前面3个正则代表如下:
conf( "^hello.*"):与资源名称(用户被授予配置权限)相匹配的正则表达式,^hello.*表示配置以hello开头的所有资源。
写( ".*"):与资源名称相匹配的正则表达式,用户被授予写权限,.*表示配置为所有资源。
读( ".*"):与资源名称匹配的正则表达式,已授予用户读取权限.*表示配置为所有资源。
5、修改前面的发送和接受的代码后如下:
发送者代码:
- public class Send {
-
-
-
-
- public static void main(String[] args) throws Exception {
-
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.137.101");
- factory.setPort(5672);
- factory.setVirtualHost("vhost_hello");
- factory.setUsername("lazy");
- factory.setPassword("111111");
-
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
-
- //public final static String HELLO_QUEUE_NAME = "hello";
- channel.queueDeclare(Const.HELLO_QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- channel.basicPublish("", Const.HELLO_QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
-
-
- }
- }
接收者代码:
- public class Recv {
-
-
- public static void main(String[] argv) throws Exception {
-
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.137.101");
- factory.setPort(5672);
- factory.setVirtualHost("vhost_hello");
- factory.setUsername("lazy");
- factory.setPassword("111111");
-
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
-
- //public final static String HELLO_QUEUE_NAME = "hello";
- //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- channel.queueDeclare(Const.HELLO_QUEUE_NAME, false, false, false, null);
-
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
-
- DeliverCallback deliverCallback = new DeliverCallback() {
- public void handle(String s, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- }
- };
-
-
- //basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
- channel.basicConsume(Const.HELLO_QUEUE_NAME, true, deliverCallback, new CancelCallback() {
- public void handle(String s) throws IOException {
-
- }
- });
- }
- }
6、运行发送者控制台输出:
7、运行接收者控制台输出:
8、查看rabbitmq服务vhost_hello虚拟机上队列创建情况:
rabbitmqctl list_queues -p vhost_hello
至此,我们用java实现了一个基于RabbitMQ简单的发送者和接收者程序!
前面我们说过,rabbitmq这么主流的原因之一就是它支持插件式扩展能力,比如前面通过rabbitmqctl命令管理的方式如果能有个界面化操作的话会更加友好,rabbtimq通过插件的方式提供了一款界面后台管理,默认端口为:15672:
我们直接通过浏览器访问是无法访问到的,我们需要通过插件管理命令rabbitmq-plugins激活插件,激活命令如下:
rabbitmq-plugins enable rabbitmq_management
激活后无需重启服务就可以使用了,访问后界面如下:
这里需要带有如下标签账号才能登录:
标签 | 能力 |
(None) | 无法访问管理插件 |
管理(management) | 用户可以通过消息传递协议执行的任何操作以及: 列出可以通过AMQP登录的虚拟主机 查看“其”虚拟主机中的所有队列,交换和绑定 查看并关闭自己的渠道和联系 查看涵盖其所有虚拟主机的“全局”统计信息,包括其中其他用户的活动 |
政策制定者(policymaker) | 包含management所有权限外加: 查看,创建和删除可通过AMQP登录到的虚拟主机的策略和参数 |
监控(monitoring) | 列出所有虚拟主机,包括它们无法使用消息传递协议访问的虚拟主机 查看其他用户的联系和渠道 查看节点级别的数据,例如内存使用情况和集群 查看所有虚拟主机的真实全局统计信息 |
管理员(administrator) | 包含policymaker和monitoring所有权限外加: 创建和删除虚拟主机 查看,创建和删除用户 查看,创建和删除权限 关闭其他用户的连接 |
标签类似角色,不同标签拥有不同的权限,我们前面创建的lazy账号设置的标签为administrator,类似超级管理员权限,所以lazy当然是可以直接登录的且拥有超管的权限,登录后界面如下:
我们可以通过后台管理界面进行管理用户、虚拟机、交换机、队列、发送消息、连接数等擦操作,功能非常丰富。
关注本人微信公众号,后续将持续rabbitmq集群搭建、组件讲解、配置讲解等更新更多干货!
---------------------- 正文结束 ------------------------
长按扫码关注微信公众号
Java软件编程之家
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。