赞
踩
AMQP(Advanced Message Queue Protocol)高级消息队列协议。协议无非就是需要遵循一定的数据规范,是在线路层上建立的,也就是应用层协议的一个开放标准,定义了网络交互的数据格式,而不是API接口(例如JMS),这使得AMQP和JMS从本质上的区别。它天然就是跨平台的,就像SMTP、HTTP 等协议样,只要开发者按照规范的格式发送数据,任何平台都可以通过AMQP进行消息交互。像目前流行的 StormMQ、RabbitMQ 等都实现了 AMQP。
RabbitMQ是一个实现了 AMQP 的开源消息中间件,使用高性能的 Erlang 编写。 RabbitMQ具有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。
参考链接:https://www.helloweba.net/server/624.html
由于RabbitMQ使用Erlang编写,安装 RabbitMQ 之前要安装 Erlang,需要先到RabbitMQ官网看下版本对应关系。可以分别在Erlang的Github和RabbitMQ官网下载对应的版本的rpm包。下载步骤如下:
# 下载erlang wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.4/erlang-23.2.4-1.el7.x86_64.rpm # 下载rabbitmq wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.11/rabbitmq-server-3.8.11-1.el7.noarch.rpm # 安装socat依赖,安装Erlang时需要 yum install -y socat # 安装Erlang rpm -ivh erlang-22.2-1.el7.x86_64.rpm # 查看Erlang安装版本 erl -version # 6.安装RabbitMQ rpm -ivh rabbitmq-server-3.8.1-1.el7.noarch.rpm # 启动rabbitmq systemctl start rabbitmq-server 或 service rabbitmq-server start # 查看状态 rabbitmqctl status # 启用网页版后台管理插件 rabbitmq-plugins enable rabbitmq_management # 重启rabbitmq systemctl restart rabbitmq-server 或 service rabbitmq-server restart #添加一个用户名为admin,密码为admin的用户 rabbitmqctl add_user admin admin #设置admin用户的角色为管理员 rabbitmqctl set_user_tags admin administrator #配置admin用户可以远程登录 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmq其他常用操作:
# 可以设置rabbitmq开机自启
systemctl enable rabbitmq-server
# 关闭服务
rabbitmqctl stop
插件管理:
#插件列表:
rabbitmq-plugins list
#启动插件:
rabbitmq-plugins enable XXX (XXX为插件名)
#停用插件:
rabbitmq-plugins disable XXX
RabbitMQ启动成功后,默认有一个guest用户,但是该用户只能在本地登录,无法远程登录,因此本案例中添加了一个新的用户sang,也具有管理员身份,同时可以远程登录。当RabbitMQ启动成功后,在物理机浏览器上输入虚拟机地址: http://localhost:15672,15672是RabbitMQ的默认端口,如下:
登录admin如下:
Spring Boot为AMQP提供了自动化配置依赖spring-boot-starter-amqp ,因此首先创建Spring Boot项目并添加该依赖,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ连接信息如下:
spring:
rabbitmq:
# rabbitmq地址、端口、用户名、密码
host: localhost
# 注意:该端口不是15672
port: 5672
username: admin
password: admin
由于所有的消息生产者提交的消息都会交由Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的Queue 中。RabbitMQ中一共提供了4种不同的Exchange策略,分别是 Direct、Fanout、Topic以及Header,这4种不同的策略中前3种的使用频率较高,第4种的使用频率较低,下面分别对这4种不同的ExchangeType予以介绍。
Direct的策略是将消息队列绑定到DirectExchange上,即将消息转发到与该条消息key相同的Queue中,例如该队列的名称为"queue-name",监听的key为"queue-name"的消息会被该消息队列接收。RabbitMQDirectConfig如下:
@Configuration public class RabbitMQDirectConfig { public final static String DIRECTNAME = "weiyh-direct"; @Bean Queue queue() { // 队列名称,用于接收者监听 return new Queue("queue-name"); } @Bean DirectExchange directExchange() { // 消息名称,重启后是否有效,长期未用是否删除 return new DirectExchange(DIRECTNAME, true, false); } @Bean Binding binding() { // 将队列以direct的方式进行绑定 return BindingBuilder.bind(queue()).to(directExchange()).with("direct"); } }
消息消费者DirectReceiver如下:
@Component
public class DirectReceiver {
// 监听的队列名称
@RabbitListener(queues = "queue-name")
public void handler(String msg) {
System.out.println("DirectReceiver:" + msg);
}
}
在测试类中注入RabbitTemplat 对象来进行消息发送,如下:
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
System.out.println("nihao");
rabbitTemplate.convertAndSend("queue-name", "===========>hello direct");
}
}
记得启动RabbitMQ,然后在启动测试类,成功输出如下:
RabbitMQ控制台可以看到:
Fanout的策略是把所有到达FanoutExchange的消息转发给所有与它绑定的Queue,这个过程中key不起任何作用。创建两个Queue,将两个Queue绑定到FanoutExchange中,如下:
@Configuration public class RabbitMQFanoutConfig { public final static String FANOUTNAME = "weiyh-fanout"; @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUTNAME, true, false); } @Bean Queue queueOne() { return new Queue("queue-one"); } @Bean Queue queueTwo() { return new Queue("queue-two"); } @Bean Binding bindingOne() { // 将队列queueOne绑定到fanoutExchange中 return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { // 将队列queueTwo绑定到fanoutExchange中 return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } }
创建两个消费者如下:
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handlerOne(String msg) {
System.out.println("FanoutReceiver.handlerOne:" + msg);
}
@RabbitListener(queues = "queue-two")
public void handlerTwo(String msg) {
System.out.println("FanoutReceiver.handlerTwo:" + msg);
}
}
测试:
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
// exchange策略名称,key为null,发送内容
rabbitTemplate.convertAndSend(RabbitMQFanoutConfig.FANOUTNAME, null,
"========>hello fanout");
}
}
注意:这里发送消息时不需要key,指定exchange即可,key为null。
一条消息发出后,所有与fanout绑定的Queue都接收到了消息,输出如下:
首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP请求中的请求头。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash内容匹配上的时候,消息就会被写入队列。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。