赞
踩
本文为观看B站RabbitMQ学习视频后整理的相关知识点,可能有不足的地方,欢迎指正。
RabbitMQ学习——飞哥/狂神
AMQP模型中定义了三个角色,分别是:exchange交换机,message queue消息队列,binding绑定关系;
exchange交换机,用于接收信息生产者生产的消息,在AMQP中消息并不会直接由生产者交给队列,而是通过exchange交换机来进行消息接收,而后根据消息中的部分属性,即表示到达目标队列的信息,由交换机来进行转发,有点像网络中的路由器,但转发到哪个消息队列还和交换机的工作模式有关;
message queue消息队列,用于接收从exchange交换机发送过来的消息,队列中的每一条消息只允许被消费一次,不允许多次消费。message queue消息队列拿到exchange交换机推送的消息后,再推送给监听了当前队列的消费者;
如果有多个消费者监听同一个message queue消息队列怎么办?
binding绑定关系,将message queue消息队列绑定到exchange交换机上,绑定完成之后,每次推送到exchange交换机上的信息都会根据当前工作模式以及binding绑定关系将交换机上的消息推送到message queue消息队列上;
Simple:
该模式主要的重点放在了消费端和队列上,即一个队列只使用一个消费端监听,实现一一对应的关系,在交换机层面,该队列可以绑定不同类型的交换机,只不过在这里,最简单的时候是不绑定交换机,这种情况下会认为绑定了默认交换机,而此时的路由key则为队列的队列名。
Work queues:
该模式主要的重点放在了消费端和队列上,从图中可以看出,两个消费端监听了同一个消息队列,由于消息队列种的消息只允许被消费一次,于是这里涉及到了一个谁消费哪些消息的问题,下面讲的两种分发模式就是来解决这个问题的。
轮询分发模式:
轮询分发模式会把信息尽量平均地分发给已经监听了该端口的所有消费者,比如在有两个消费端,消息队列有10条消息的情况下,消费端1固定处理0,2,4,6,8序号的消息,消费端2固定处理1,3,5,7,9序号的消息。实现轮询分发模式与消费端的应答方式有关,如果是自动应答则是这种工作模式;
公平分发模式:
公平分发模式,则是将消息一个一个传递给监听了该端口的消费者,秉持谁工作快谁多处理一些消息的原则。因此,为了确认每个消费者的消费速度,处于公平分发模式下的消费者需要实现手动应答,同时队列也要关闭自动应答的设置,否则会出现bug。
轮询分发和公平分发更多的是一种概念,具体实现还是需要通过代码去修改相关的配置,才能进一步实现。而RabbitMQ本身并没有规定,或者说将这种模式包装,因此才认为轮询分发和公平分发是一种概念,当然,如果使用Spring Boot整合RabbitMQ,那么这一部分的工作是已经整合好的,可以直接使用。
Publish/Subscribe:
前面提到,在消息队列中,一条消息只能被消费一次,那么是否有办法让一条消息可以被两个消费端消费?发布订阅模式正好可以解决这样的问题。在发布订阅模式中,关注的更多是交换机与消息队列之间的关系,即交换机会把它受到的所有消息都推送到绑定了该交换机的消息队列中,该模式下交换机的type为fanout,且与消息队列绑定的路由key没有关系,设不设置都无所谓。
Routing:
路由模式中,交换机的type为direct,通过绑定到交换机上的消息队列和其路由key,交换机根据消息携带的路由key,有选择地将其转发到目标消息队列中,实现精准消费,最常见的工作模式。
Topics:
主题模式中,可以通过模糊匹配的方式进行消息推送,具体方式是这样的,消息队列绑定到交换机上的路由key依然是具体的,但是消息携带的目标路由key则是带有通配符 “*” 或者 “#” 的,"*" 可以匹配0个或多个元素,而 “#” 只能匹配且必须匹配一个,基于这种匹配规则,交换机将获得的消息分别推送到目标消息队列中。主题模式下交换机的type为topics。
Headers:
交换机type中还有一个headers类型,该类型的推送原则是,依据消息中携带的参数进行推送,此时推送标准是消息队列与交换机绑定的参数,而非路由key。该工作模式在RabbitMQ并没有提及,猜测是应用比较少的原因。
Producer代码:
public class Producer { public static void main(String[] args) { //1.创建连接工厂,设置好相关配置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xxx.xxx.xxx.xxx"); factory.setPort(5672); factory.setUsername("username"); factory.setPassword("password"); factory.setVirtualHost("/"); //2.创建连接Connection Connection connection = null; Channel channel = null; try { connection = factory.newConnection("生产者"); //3.通过连接获取通道Channel channel = connection.createChannel(); //4.通过通道创建交换机,声明队列,绑定关系,路由key,发送信息和接收信息 //@params1 队列的名称 //@params1 是否要持久化 //@params1 排他性设置,是否是独占队列 //@params1 是否自动删除,随着最后一个消费者消费完毕消息是否自动删除队列 //@params1 附属参数 channel.queueDeclare("生产者队列",false,false,false,null); //5.发送消息给队列 //@params1 交换机 //@params1 目标队列名称 //@params1 参数 //@params1 推送信息的字节数组 channel.basicPublish("","生产者队列",null,"Hello, rabbitmq!!!".getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { //6.关闭通道 if (channel!=null&&channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection!=null&&connection.isOpen()){ //7.关闭连接 try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Consumer代码:
public class Consumer { public static void main(String[] args) { // 创建连接工厂,设置相关信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xxx.xxx.xxx.xxx"); factory.setPort(5672); factory.setUsername("username"); factory.setPassword("password"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); // 消费者监听端口,第二个参数为true,设置是否自动应答 channel.basicConsume("生产者队列", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("接收成功"); System.out.println("接收到的信息为:" + new String(delivery.getBody(),"UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("接受失败"); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally{ if (channel!=null&&channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection!=null&&connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }
为什么RabbitMQ是基于通道进行操作而不是基于连接?
AMQP是一个使用TCP提供可靠投递的应用层协议,其连接通常是长连接。有些应用需要与AMQP代理建立多个连接,如果基于连接进行操作,需要开启多个TCP连接,会消耗过多的系统资源并且使得防火墙的配置更加困难,而基于通道进行操作,则可以保证在只有一个TCP连接的情况下,开启多个Channel进行通信。
此外,在多线程/进程的应用中,为多线程/进程之间开辟的channel信道互相之间是完全隔离的,保证了安全性。
是否存在没有绑定交换机的队列?
不存在,即使声明时没有绑定,RabbitMQ也会自动为队列绑定默认交换机。
RabbitMQ的组件?
大部分组件在前面已经介绍过了,如交换机,消息队列,绑定关系,还有部分组件如下:
channel.exchangeDeclare(exchangeName,exchangeType,true); //param1,交换机名字,param2,交换机类型,param3,是否持久化; channel.queueDeclare(queueName,true,false,false,null); //param1,队列名字,param2,是否持久化,param3,排他性,是否只能被一个channel访问 //param4,是否自动删除,随着最后一个消息被消费是否要删除本队列 //param5,附带参数,可以设置相关队列的信息,如死信队列,以map形式传递 channel.queueBind(queueName,exchangeName,routeKey); //param1,队列名,param2,交换机名,param3,绑定的路由keu channel.basicPublish(exchangeName,routeKey/queueName,args,message.getBytes()); //param1,交换机名称,param2,路由key或者队列名,在默认交换机中如果没有路由key则使用交换机名 //param3,消息的附属参数,在Header类型的交换机中可能会使用到 //message.getBytes(),消息的字节码数组,因为该API只能传输字节码数组 channel.basicQos(1); //手动应答情况下设置每次一次性消费的消息数 channel.basicConsume(queueName,true,new DeliverCallback(){ public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName+"收到消息是:" + new String(message.getBody(), "UTF-8")); //channel.basciAck(message.getEnvelope().getDeliveryTag(),false); //如果是手动应答的话,这里便是消息消费完后给到queue的应答 } },new CancelCallback(){ public void handle(String consumerTag) throws IOException { System.out.println("接受消息失败了。。。。"); } }); //param1,消息队列名,param2,是否设置自动应答,param3,接收成功处理逻辑,param4,接收失败处理逻辑
编写相关配置:
spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password
spring.rabbitmq.virtual-host=/
生产端逻辑代码:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void makeOrder(String userid,String productid,int num){ String orderid = UUID.randomUUID().toString(); System.out.println("成功生成订单:"+orderid); String exchange_name = "fanout_exchange"; String route_key = ""; rabbitTemplate.convertAndSend(exchange_name,route_key,orderid); //convertAndSend方法,第一个参数是交换机 //第二个参数是routekey,如果绑定的是默认交换机,则代表的是队列名字 //第三个参数是Object类型的,需要传递的消息 } }
编写相关配置类,声明交换机及相关队列,建立绑定关系:
@Configuration public class FanoutConfiguration { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout_exchange",true,false); //参数1:交换机名字 参数2:是否持久化 参数3:是否自动删除 } @Bean public Queue orderQueue(){ return new Queue("order.fanout.queue",true); //参数1:队列名字 参数2:是否持久化 } @Bean public Binding orderBinding(){ return BindingBuilder.bind(orderQueue()).to(fanoutExchange()); //使用BindingBuilder类协助绑定队列和交换机,同时使用Bean注解托管到Spring中 } }
消费端逻辑代码:
@RabbitListener(queues = {"order.fanout.queue"})
@Component
public class OrderConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("order_fanout---接收到了信息--->:"+message);
}
}
注意:消费端的代码编写一个类,可以自动监听目标队列中是否有需要推送的消息,需要使用@RabbitListener注解,queues属性用来声明监听的队列有哪些。同时使用@Component注解将其托管到Spring容器中,如果有消息则接受,调用@RabbitHandler注解的方法来处理,该注解标注的方法可以有多个,进入到处理逻辑的时候可以根据Message的类型进行方法选择;
SpringBoot整合下的消费端RabbitMQ是自动应答的嘛?默认情况下是自动应答的,可以通过更改配置设置手动应答。
过期队列与过期消息存在着一定的区别,对于过期消息的处理,过期队列可以将其发送到死信队列中,而单独设置的过期消息如果是存放在非过期队列中,只会被直接删除;
消息队列可以绑定一个死信交换机,通过这种方式将符合死信要求的消息推送到死信交换机上,再由死信交换机根据模式规则分发到绑定的队列上,就完成了死信的处理,后续只需要编写死信消费者监听死信队列及时处理死信即可;
这里的死信交换机指的是一个概念上的交换机,并不是真实存在的一个交换机类别,可以通过设置队列的某些属性来指定某个交换机成为该队列的死信交换机(队列一旦被创建就无法被修改,指无法通过代码的set方法来修改,这里应该是涉及到Spring一旦托管就不会再次创建来覆盖原先的设置)。
x-dead-letter-exchange = targetExchange
x-dead-letter-routing-key = targetRoutingKey
死信队列可以接收符合以下要求的消息:
从可视化界面中可以方便地观察到:Memory目前占用了132MiB,阈值为731MiB,磁盘目前剩余36GiB,阈值为48MiB;
内存阈值的设置,默认是设置为可用内存的40%,可以通过命令行或者配置文件进行修改,建议修改范围为0.4-0.7,不建议超过0.7;
内存换页:在某个Broker节点及内存阻塞生产者之前(默认是内存阈值的一半),它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以转移过程中持久化的消息会先从内存中清除。内存换页的值也可以人为地进行设置,默认是0.5,可以设置成小于1的值(如果设置成1触发内存换页的同时内存也预警RabbitMQ无法正常工作,没有意义);
设置:
启动第一个节点:
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
#设置通讯端口为5672,节点名为rabbit-1
启动第二个节点:
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS=" -rabbitmq_managment listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
#设置通讯端口为5673,外部访问端口为15673,节点名为rabbit-2
设置rabbit-1为主节点:
# 停止应用(-n 节点名)
sudo rabbitmqctl -n rabbit-1 stop_app
# 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
# 启动应用
sudo rabbitmqctl -n rabbit-1 start_app
设置rabbit-2为从节点:
# 停止应用
sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-2 reset
# 将rabbit2节点加入到rabbit1,集群当中【Server-node为服务器的主机名,根据实际情况进行修改即可】
sudo rabbitmqctl -n rabbit-2 join_cluster tabbit-1@'Server-node'
# 启动应用
sudo rabbitmqctl -n rabbit-2 start_app
为外部访问设置用户,密码:
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
在分布式的工作环境下,如果某一项功能的实现调用了多个服务,多个服务各自又对于数据库做出了自己的修改,当后一个服务的数据库更改操作失效了,传统的事务只能保证该操作可以被回滚,而无法回滚先前其他服务中的数据库操作,这就会导致数据库中的数据不一致,这也是分布式架构比较常见的一个问题。因此,怎么保证生产和消费的可靠成为了分布式架构下使用消息中间件的一个比较重要的问题。
如果消费过程中出现了异常,在开启了自动应答的消费队列中,会不断重发从而陷入死循环,可能会导致磁盘崩溃,此时可以采用以下的解决方法:
控制重发的次数,如果超过了该次数则将其转移到死信队列,对死信队列中的消息需要设置相应措施,不然可能出现消息丢失的问题;
spring.rabbitmq.listener.simple.acknowledge-mode = manual //开启手动ACK应答
spring.rabbitmq.listener.simple.retry.enable = true //开启重试
spring.rabbitmq.listener.simple.retry.max-attempts = 3 //设置最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval = 2000ms //设置重试间隔
使用try catch搭配手动ACK应答,如果出现异常的情况直接打入死信队列;try catch与重发的机制是互相矛盾的;
channel.basicAck(tag,false);
//正常应答,即消费成功,第二个参数为是否批量确认,如果为false,则确认的是当前的消息
//如果为true,则确认的是比当前消息的tag小的其他消息,即这里的tag是确认号
channel.basicNack(tag,false,false);
//非正常应答,消费端无法正常消费,返回给RabbitMQ
//这个方法比前面多了一个参数,为false的情况下,消息被放到死信队列
//如果是true,则是选择重发的策略,前面提到的方法就是使用true
在第二种解决方案的基础上搭配对于死信的处理;
消息会被重复消费,可能的原因是消息的可靠生产的重发过程中,最终队列中存在两个请求相同的消息,也有可能是消费端没收到消息请求重发最终收到了两个相同的消息。在这种情况下,可以通过以下的方式保证幂等性:
消费者获取到消息后先根据id去查询redis/db是否存在该消息
如果不存在,则正常消费,消费完毕后写入redis/db
如果存在,则证明消息被消费过,直接丢弃。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。