赞
踩
RabbitMQ是一个开源的AMQP实现,采用erlang语言编写,支持多种客户端,如:Python、java、C、.NET,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错。
Broker
RabbitMQ的服务端程序,一个mq节点就是一个broker
Producer生产者
创建Messsage发送到RabbitMQ中
Consumer消费者
消费队列中的消息
Message消息
生产消费的内容,有消息头和消息体,包括多个属性的配置,如RoutingKey
Channel信道
一条支持多路复用的通道,独立的双向数据流通道,可以发布订阅、接收消息、信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道
Connection连接
RabbitMQ的socket链接,他封装了socket协议相关的部分逻辑,一个链接上可以有多个信道进行通信
Exchange交换机
生产者将消息通过信道发送给交换机,交换机将消息路由给一个或者多个队列中,交换机和对列是多对多的关系
RoutingKey路由键
生产者将消息发送给路由的时候,都会指定一个RoutingKey,交换机和队列之间绑定一个BindingKey,交换机同过RoutingKey匹配BindingKey发送到指定的队列
BindingKey绑定key
交换机和队列绑定的key
Virtual host虚拟主机
用于不同的业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同的名称的Exchange和Queue
默认是:/
自行添加:/dev
/prod
/test
#拉取镜像
docker pull rabbitmq:management
docker run -d --hostname rabbit_host --name rabbitmq -e RABBITMQ_DEFAULT_USER=lixiang -e RABBITMQ_DEFAULT_PASS=992184xiang. -p 15672:15672 -p 5672:5672 rabbitmq:management
#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中
-e 参数
RABBITMQ_DEFAULT_USER 用户名
RABBITMQ_DEFAULT_PASS 密码
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
简单队列就是最简单的一种模式,有生产者,消费者还有队列组成,生产者将消息发送给队列,消费者从队列中读取消息完成消费。
编码实现
生产者
编码流程:创建连接信息->创建信道->设置队列信息->发布消息
核心API:connection.createChannel()、channel.queueDeclare()、channel.basicPublish()
/**
* 消息的生产者
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
//创建连接参数
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setHost("8.140.116.67");
factory.setVirtualHost("/dev");
//创建连接
try(Connection connection = factory.newConnection()){
//创建信道
Channel channel = connection.createChannel();
//设置队列参数
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
* 第一个参数:队列名称
* 第二个参数:持久话配置
* 第三个参数:是否独占,只能有一个消费者监听队列,发布订阅是独占
* 第四个参数:当没有消费者消费的时候,消息自动删除
* 第五个参数:其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//设置消息内容
String msg = "Hello word";
//推送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
* 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
* 第三个参数:配置信息
* 第四个参数:消息体内容,转换成字节数组
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送到mq中一条");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者
编码流程:创建连接->创建信道->设置队列信息->创建消费者回调函数->消费消息
核心API:queueDeclare()、new DefualtConusmer(channel)、channel.basicConsume()
/**
* 消息的消费者(持续监听)
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//设置队列参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
这种简单队列的模式,系统会为每个队列隐式地绑定一个默认交换机,交换机名称为" (AMQP default)",类型为直连 direct,当你手动创建一个队列时,系统会自动将这个队列绑定到一个名称为空的 Direct 类型的交换机上,绑定的路由键 routing key 与队列名称相同。
工作队列:使用于生产者能力大于消费者的场景,增多消费者节点,有两中策略,默认是round robin轮询策略,还有一个是公平策略
轮询策略
在简单队列的基础上,把消费确认机制改成手动确认,并且加上延时。
核心API:channel.basicAck(envelope.getDeliveryTag(),false)
/**
* 消息的消费者(持续监听)
*/
public class Recv1 {
private final static String QUEUE_NAME = "work_ms_rr";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//设置队列参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/**
* 模拟消费延迟
*/
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
//手工确认消息消费,不是多条确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
公平策略
解决消费者能力不足的问题,降低消费时间问题
channel中设置消费者每次消费一个,消费完在进入下一个
//限制消费者每次消费一个,消费完在消费下一个
channel.basicQos(1);
/**
* 消息的消费者(持续监听)
*/
public class Recv1 {
private final static String QUEUE_NAME = "work_ms_fair";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//设置队列参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//限制消费者每次消费一个,消费完在消费下一个
channel.basicQos(1);
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/**
* 模拟消费延迟
*/
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("body:"+new String(body,"utf-8"));
//手工确认消息消费,不是多条确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
生产者将消息发送到Exchange,交换机将消息路由到一个或者多个队列中,交换机有四个类型,队列和交换机是多对多的关系。
交换机只负责转发消息,不具备存储消息的能力,如果没有队列和交换机绑定,或者没有符合规则的路由,消息将被丢失。
RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange
交换机类型
RabbitMQ默认自带7个交换机
RabbitMQ发布订阅模型
发布-订阅模型中,消息生产者不在直接面对queue,而是直面exchange,都需要经过exchange俩进行消息的转发,不需要指定routingKey,所有发送到同一个fanout交换机的消息都会被监听这个交换机的队列收到。
发布订阅模型应用场景
发布订阅模型通过把消息发送给交换机,交换机转发给对应的绑定队列,交换机绑定的队列是排它独占队列,自动删除。
编码实战
生产者
编码流程:创建连接对象->创建信道->绑定交换机->推送消息
核心API:channel.exchangeDeclare(EXCHANGE_NAME,“fanout”)
/**
* 消息的生产者
*/
public class Send {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
//创建连接参数
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setHost("8.140.116.67");
factory.setVirtualHost("/dev");
//创建连接
try(Connection connection = factory.newConnection()){
//创建信道
Channel channel = connection.createChannel();
//绑定交换机,fanout_exchange,广播,第一个参数交换机名称,第二个参数交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//设置消息内容
String msg = "小滴课堂大课训练营发布";
//推送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
* 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
* 第三个参数:配置信息
* 第四个参数:消息体内容,转换成字节数组
*/
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("广播消息发送成功");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者
创建连接对象->创建信道->绑定交换机->获取队列名称->绑定交换机和队列->消费者消费消息
核心API:channel.queueDeclare().getQueue(),channel.queueBind(queue,EXCHANGE_NAME,“”)
/**
* 消息的消费者(持续监听)
*/
public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//获取对列名称
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,队列名称,交换机名称,routingKey
channel.queueBind(queue,EXCHANGE_NAME,"");
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/**
* 模拟消费延迟
*/
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("body:"+new String(body,"utf-8"));
//手工确认消息消费,不是多条确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(queue,false,consumer);
}
}
路由模式简介
交换机类型:Direct
队列和交换机绑定,需要指定一个路由key(BindingKey)
消息生产者发送消息到交换机,需要指定RoutingKey
交换机根据消息的路由key,转发给对应的队列
例子:日志采集系统,一个队列收集错误日志,一个队列收集全部日志
编码实战
/**
* 消息的生产者
*/
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) {
//创建连接参数
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setHost("8.140.116.67");
factory.setVirtualHost("/dev");
//创建连接
try(Connection connection = factory.newConnection()){
//创建信道
Channel channel = connection.createChannel();
//绑定交换机,直连交换机direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//设置消息内容
String errMsg = "错误日志";
String infoMsg = "日常日志";
String warnMsg = "警告日志";
//推送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
* 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
* 第三个参数:配置信息
* 第四个参数:消息体内容,转换成字节数组
*/
//发送消息指定routingKey
channel.basicPublish(EXCHANGE_NAME,"errRoutingKey",null,errMsg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,infoMsg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"warnRoutingKey",null,warnMsg.getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送到mq中一条");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 消息的消费者(持续监听)
*/
public class Recv {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,direct直连模式
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"warnRoutingKey");
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(queueName,true,consumer);
}
}
/**
* 消息的消费者(持续监听)
*/
public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,direct直连模式
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey");
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(queueName,true,consumer);
}
}
背景
主题模式简介
交换机是topic,可以实现发布订阅模式fanout和路由模式Direct的功能,更加灵活,支持模式匹配和通配符匹配。
交换机通过通配符转发到相应的队列,*代表一个词,#代表一个或者多个词。
注意:交换机和队列绑定时用的binding使用通配符的路由键,生产者和交换机绑定要使用具体的路由键。
例子:日志采集系统,一个队列中收集全部的订单日志信息,order.log.#,一个队列中收集群全部的日志消息*.log.#
编码实战
/**
* 消息的生产者
*/
public class Send {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) {
//创建连接参数
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setHost("8.140.116.67");
factory.setVirtualHost("/dev");
//创建连接
try(Connection connection = factory.newConnection()){
//创建信道
Channel channel = connection.createChannel();
//绑定交换机,直连交换机direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//设置消息内容
String errMsg = "错误日志";
String infoMsg = "日常日志";
String warnMsg = "警告日志";
//推送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
* 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
* 第三个参数:配置信息
* 第四个参数:消息体内容,转换成字节数组
*/
channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,errMsg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,infoMsg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"order.log.warn",null,warnMsg.getBytes(StandardCharsets.UTF_8));
System.out.println("消息成功发送到mq中一条");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 消息的消费者(持续监听)
*/
public class Recv {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,direct直连模式
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(queueName,true,consumer);
}
}
/**
* 消息的消费者(持续监听)
*/
public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建配置连接信息对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.140.116.67");
factory.setUsername("admin");
factory.setPassword("992184xiang.");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//创建连接,消费者一版不自动关闭,因为持续监听
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,direct直连模式
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"*.log.#");
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
}
};
//消费消息,自动确认
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 第一个参数:队列名称
* 第二个参数:自动确认消费完成
* 第三个参数:消费者的回调函数
*/
channel.basicConsume(queueName,true,consumer);
}
}
简单模式
一个生产者、一个消费者,不用指定交换机,使用默认的交换机。
工作模式
一个生产者,多个消费,可以有轮询和公平策略,不用指定交换机,使用默认的交换机。
发布订阅模式
fanout类型交换机,通过交换机和队列绑定,不用指定绑定的路由键,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingKey路由键
路由模式
direct类型交换机,交换机和队列绑定,指定的绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息指定的RoutingKey要和交换机绑定队列的bindingKey一致进行转发。
主题模式
topic交换机,交换机和队列绑定,指定绑定的通配符,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要制定routingKey路由键
创建配置连接对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort("5672");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
//创建连接对象
Connection connection = factory.newConnection();
创建信道
Channel channel = connection.createChannel();
绑定队列
/**
* 第一个参数:队列的名称
* 第二个参数:是否持久化配置
* 第三个参数:是否独占,发布订阅模型一般独占
* 第四个参数:自动删除,当没有消费者的消费消息的时候,是否自动删除消息
* 第五个参数:其他参数
**/
channel.queueDeclare(queueName,false,false,false,null);
推送消息
/**
* 第一个参数:交换机名称
* 第二个参数:队列名称
* 第三个参数:配置信息
* 第四个参数:发送消息的字节数组
**/
channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));
消费消息
//消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
}
};
/**
* 第一个参数:队列名称
* 第二个参数:是否自动消费完成
* 第三个参数:消费者的回调函数
**/
channel.basicConsume(queueName,true,consumer);
限制消费者每次只能消费一个
channel.basicQos(1);
手工确认消费消息
channel.basicAck(envelope.getDeliveryTag,false);
绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
获取队列名称
String queueName = channel.queueDeclare().getQueue();
绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等.
提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发.
总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter.
<!-- 代码库 -->
<repositories>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<!--引入AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml配置文件修改
#消息队列
spring:
rabbitmq:
host: 10.211.63.14
port: 5672
virtual-host: /dev
password: 123456
username: admin
RabbitMQConfig配置类
@Configuration
public class RabbitMQConfig{
public static final String EXCHANGE_NAME = "exchange_order";
public static final String QUEUE = "order_queue";
/**
* 交换机
* @return
*/
@Bean
public Exchange orderExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(ture).build();
}
/**
* 队列
* @return
*/
@Bean
public Queue orderQueue(){
return QueueBuilder.durable(QUEUE).build();
}
/**
* 交换机和队列绑定关系
*/
@Bean
public Binding orderBinding(Queue queue,Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
生产者发送消息
rabbitTemplate.converAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单");
消费者监听消息
@Component
@RabbitMQListener(queue = "order_queue")
public class OrderMQListener{
/**
* RabbitHandler 会自动匹配 消息类型(消息自动确认)
* @param msg
* @param message
* @throws IOException
*/
@RabbitHandler
public void messageHandler(String body,Message message){
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("监听到消息:消息内容:"+message.getBody());
}
}
生产者到交换机:confirmCallback
交换机到队列:returnCallback
**注意:**开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,rabbitmq整体的新跟那个效率会变低,吞吐量下降严重,不是非常重要的消息不建议开启消息确认机制
生产者到交换机
通过confirmCallback
生产者投递消息后,如果Broker收到消费后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种法师是方式可靠性投递的核心。
开启confirmCallback
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换机Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值事发布消息成功到交换机后触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
编码实战
核心API:setConfirmCallback() 、confirm(配置,是否接到消息,失败的原因)
@Test
void testConfirmCallback(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 配置
* @param ack 交换机是否收到消息,true是成功,false是失败
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback======>");
System.out.println("correlationData======>"+correlationData);
System.out.println("ack======>"+ack);
System.out.println("cause======>"+cause);
if(ack){
System.out.println("发送成功");
//更新数据库的状态,状态为成功
}else {
System.out.println("发送失败,记录到日志或者数据库");
//更新数据库的状态,状态为失败
}
}
});
//数据库新增一个消息记录,状态是发送,发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
}
**模拟异常:**修改交换机名称
交换机到队列
#配置为true,则交换机处理消息到路由失败,会返回给生产者
spring.rabbitmq.template.mandatory=true
#或者在temlate对象上设置
template.setMandatory(true);
第一步:开启returnCallback配置
#新版
spring.rabbitmq.publisher-returns=true
第二步:修改交换机投递到队列失败的策略
#为true,则交换机处理消息到路由失败会返回给生产者
spring.rabbitmq.template.mandatory=true
编码实战
@Test
void testReturnCallback(){
//publisher-returns为true则交换机处理消息到路由失败,返回给生产者
//mandatory为true则消息未被路由到任何一个queue,则回退一条消息给生产者
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("ReturnedMessage:" + returned.toString());
int replyCode = returned.getReplyCode();
System.out.println("_______________________");
System.out.println("replyCode:" + replyCode);
}
});
//数据库新增一个消息记录,状态是发送,发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单");
}
}
背景:消费者从broker中监听消息,需要确保消息被合理的处理掉
RabbitMQ的ACK介绍
确认方式
spring:
rabbitmq:
#开启手动确认消息,如果消息重新入队,进行重试
listener:
simple:
acknowledge-mode: manual
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
/**
* 处理器,适配器,加上@RabbitHandler注解
* 加上Channel这个参数
*/
@RabbitHandler
public void messageHandler(String body, Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
System.out.println("message:"+message.toString());
System.out.println("==============");
System.out.println("消息标识tag:"+tag);
System.out.println("消息体body:"+body);
//第一个参数是该消息的index,第二个是是否批量操作
channel.basicAck(tag,false);
//第一个参数是index,第二个是否批量,第三个是失败后是否重新返回给生产者重新投递
//channel.basicNack(msgTag,false,true);
}
}
deliveryTage介绍
basicNack和basicReject介绍
使用场景
业界一些实现方式
/**
* mq配置类
* 新商家上架->new_merchant_queue->死信交换机->死信队列
*/
@Configuration
public class RabbitMQConfig {
/**
* 死信队列
*/
public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
/**
* 死信交换机
*/
public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
/**
* 进入死信队列的路由key
*/
public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
/**
* 创建死信交换机
* @return
*/
@Bean
public Exchange lockMerchantDeadExchange(){
return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false);
}
/**
* 创建死信队列
* @return
*/
@Bean
public Queue lockMerchantDeadQueue(){
return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
}
/**
* 绑定死信交换机和死信队列
* @return
*/
@Bean
public Binding lockMerchantBinding(){
return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
}
/**
* 普通队列,绑定的个死信交换机
*/
public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
/**
* 普通的topic交换机
*/
public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
/**
* 路由key
*/
public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
/**
* 创建普通交换机
* @return
*/
@Bean
public Exchange newMerchantExchange(){
return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
}
/**
* 创建普通队列
* @return
*/
@Bean
public Queue newMerchantQueue(){
Map<String,Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl",20000);
return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
}
/**
* 绑定交换机和队列
* @return
*/
@Bean
public Binding newMerchantBinding(){
return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
}
}
@Component
@RabbitListener(queues = RabbitMQConfig.LOCK_MERCHANT_DEAD_QUEUE)
public class OrderMQListener {
/**
* 处理器,适配器,加上@RabbitHandler注解
*/
@RabbitHandler
public void messageHandler(String body, Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
System.out.println("message:"+message.toString());
System.out.println("==============");
System.out.println("消息标识tag:"+tag);
System.out.println("消息体body:"+body);
//如果发生异常就重新入队
try{
channel.basicAck(tag,false);
}catch (Exception e){
//拒收消息,重新入队
channel.basicNack(tag,false,true);
}
}
}
普通集群
镜像集群
注意:集群需要保证各个节点有相同的token令牌,集群内各个节点的erlang.cookie需要相同,才可以相互通信
准备三个mq节点
#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2 -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management
#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management
参数说明:
--hostname:自定义Docker容器的hostname
--link:容器之间连接,link不可缺,使得三个容器相互通信
--privileged=true:使用该参数,container内的root拥有真正的root权限,否则容器出现perission denied
-v:宿主机和容器路径映射
RABBITMQ_DEFAULT_USER=admin:配置用户名
RABBITMQ_DEFAULT_PASS=123456:配置密码
Erlang Cookie值必须相同,相当于不同节点间通信的密钥。
配置集群:
#节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
#节点二假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit
#节点三假如集群,--ram是以内存方式加入,忽略该参数默认未磁盘节点
docker exce -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
exit
#查看集群节点状态,配置启动三个节点,1个磁盘节点和2个内存节点
rabbitmqctl cluster_status
访问节点1的web控制台,可以看到多个节点
测试:node1节点创建队列,发送消息(可以选择消息的持久化,Spring-AMPQ中默认就是持久化),node2和node3都可以看到消息同步,kill掉node1节点,发现node2和node3为NaN模式,如果是非主节点创建队列和发送消息,则其他队列也可以显示。
背景:解决普通集群主节点宕机出现数据丢失的现象
RabbitMQ的策略policy是用来控制和修改集群的bhost队列和Exchange复制行为,就是要设置那些Exchange或者queue的数据需要复制、同步,以及如何复制同步
创建一个策略来匹配队列
路径:rabbitmq管理界面->Admin->Policies->Add/update a policy
参数:策略会同步一个VirtualHost中的交换机和队列数据
name:自定义策略名称
Pattern:^匹配符,代表匹配所有
Definition:ha-mode=all为匹配类型,分为3中模式
ha-model:指明镜像队列的模式,可选下面一种
all:表示在集群中所有的节点上进行镜像同步
exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像同步,界定啊名称由ha-params指定
ha-sync-mode:镜像消息同步方式automatic(自动),manually(手动)
配置好后,+2的意思就是由三个节点,一个节点本身和两个镜像节点
集群重启顺序
把host和port节点去掉换成addresses :10.211.55.13:5672,10.211.55.13:5673,10.211.55.13:5674
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。