赞
踩
引用官方的说法:RabbitMQ是轻量级的,易于部署。它支持多个消息传递协议。RabbitMQ可以部署在分布式和集群配置中,以满足高规模、高可用性的需求。
可以去拜读一下官网:RabbitMQ官网
这了只介绍了Linux环境下的安装,安装环境为CentOS7版本。其他环境参照官方安装教程
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
yum install epel-release
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
yum install erlang -y
# 下载
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3-1.el7.noarch.rpm
# 安装依赖
yum install -y socat
# 安装rabbitMQ
rpm -ivh rabbitmq-server-3.8.3-1.el7.noarch.rpm
# 配置开机启动
chkconfig rabbitmq-server on
# 启动
service rabbitmq-server start
# 启动管理服务
rabbitmq-plugins enable rabbitmq_management
netstat -ntlp
# 端口号 15672 :管理页面端口
使用默认用户 guest/guest 登录(rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问)
如果需要远程登录需要如下配置:
# 找到这个文件
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
# loopback_users里的<<”guest”>>删除。
# 重新启动
systemctl restart rabbitmq-server
管理界面
1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
这里使用简单队列,一个生产者发送消息到队列,由一个消费者来进行消费。
package com.iweb; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class Send { // 队列的名称 private final static String QUEUE_NAME = "hello-world"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.100.101"); factory.setVirtualHost("host-test"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); String message = scanner.nextLine(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
运行生产者代码并输入:hello world
点击hello-wordl
进入队列
package com.iweb; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class Recv { private final static String QUEUE_NAME = "hello-world"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.100.101"); factory.setVirtualHost("host-test"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (Exception e) { e.printStackTrace(); } } }
到这里一个简单的RabbitMQ工程完成。
Work Queues
由生产者将消息发给队列,可以由多个消费者进行消费,且消费全局有序。
package com.iweb; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.TimeoutException; public class Send { private final static String QUEUE_NAME = "hello-world"; private final static String HOST = "192.168.100.101"; private final static String V_HOST = "host-test"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost(V_HOST); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; for (int i = 1; i <= 1000; i++) { channel.basicPublish("", QUEUE_NAME, null, String.format("%s-%s", message, i).getBytes()); } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
运行生产者代码,会向队列中发送1000条消息。
我们假设每次消费需要花费 500
毫秒。注释部分先不要管
package com.iweb; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class Recv { private final static String QUEUE_NAME = "hello-world"; private final static String HOST = "192.168.100.101"; private final static String V_HOST = "host-test"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost(V_HOST); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 每次只发送一条消息 // channel.basicQos(1); Channel finalChannel = channel; DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); try { // 模拟消费的时间 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); // 开启手动消费 : 未消费的消息在消费者停止的时候会还给队列 // finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); // 对应手动消费 //channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); } catch (Exception e) { e.printStackTrace(); } } }
我们先打开一个消费者,消费者正常的有序消费
我们再开启一个消费者,发现没有消费到数据
原因说明:消费默认的消费模式是批处理,当打开消费者1后消息会一下子全部涌入消费者通道,所有再开消费者2,这是队列已经空了,消费者2将获取不到数据。而消费者消费消息的确认模式默认的是自动模式,也就是说无论消息是否被成功消费,都被认为消费。此时就算消息没有被完全消费,强制退出消费者1,消费者1也不会将数据返回队列。
所有我们做如下设置,就是上面消费者的注释部分
。
这个是再重新执行代码,两个消费者正常消费
先构建一个类用来获取会话对象
package com.iweb; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { private final static String HOST = "192.168.100.101"; private final static String V_HOST = "host-test"; public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setVirtualHost(V_HOST); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); return connectionFactory.newConnection(); } }
Exchange类型,分别是Direct 、Fanout 、Topic、Headers
生产者每次发送消息的时候会携带一个routingkey
,exchange
会根据routingkey
将消息发送到指定队列。也就是官网的 Routing
模式
package com.iweb; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private final static String EXCHANGE_NAME = "direct.exchange"; private final static String EXCHANGE_TYPE = "direct"; private final static String QUEUE_NAME1 = "direct.queue1"; private final static String QUEUE_NAME2 = "direct.queue2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明 交换机 指定交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); // 声明队列 channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "orange"); // routingkey = orange channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black");// routingkey = black channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "green");// routingkey = green // 发送消息到交换机:注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中 channel.basicPublish(EXCHANGE_NAME, "orange", null, "orange".getBytes()); System.out.println(" [x] Sent 'orange'"); channel.basicPublish(EXCHANGE_NAME, "black", null, "black".getBytes()); System.out.println(" [x] Sent 'black'"); channel.basicPublish(EXCHANGE_NAME, "green", null, "green".getBytes()); System.out.println(" [x] Sent 'green'"); channel.close(); connection.close(); } }
运行生产者代码,查看管理UI
这个时候我们可以来写消费者代码了。消费者代码同 work queues
模式,只要修改队列名称即可。这里不贴代码了。
Fanout 模式,exchange
会将消息发送给每个队列。
改造上面的生产者代码,其他不需要动:
执行代码后查看管理UI
消费者代码同 work queues
模式,只要修改队列名称即可。
此类型exchange和direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:’*’,’#’.
改造 Direct 的生产者代码
修改名称
修改队列绑定key
修改发送的数据
这里 user.name将会发送给 user.* 和 user.#,而user.name.age只会发送给user.#
查看管理UI
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。