赞
踩
①RabbitMQ是基于AMQP协议(Advanced Message Queuing Protocol)消息队列。
②AMQP协议:具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
③AMQP的核心概念:
Connection:publisher/consumer和broker之间的TCP连接。
Virhost host:把AMQP的基本组件划分到一个虚拟的分组中,同一个RabbitMQServer有多个vhost,每一个用户都在自己的vhost创建exchange/queue。
Broker:接受和分发消息的应用,RabbitMQ Serve就是Message Broker。
Channel:Channel是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Exchange:message去broker的第一站,根据分发规则匹配routing key分发到queue去。(常用类型:direct (point-to-point), topic (publish-subscribe) and fanout (multicast))。
Queue:消息最终被送到consumer取走。
Binding:exchange和queue之间的虚拟连接。binding中包含routing key。Binding消息被保存到exchange中的查询表,用于message的分发。
#rabbitmq默认端口号5672;内部是15672 #修改rabbitmq默认配置信息路径 /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app #默认安装路径 /usr/share/doc/rabbitmq-server-3.6.5 #日志路径 /var/log/rabbitmq/rabbit@charon.log lsof -i:5672 -- 查看是否启动rabiitmq rabbitmq-plugins enable 插件名(rabbitmq_management) --开启rabbitmq的插件(开启管理台) #默认配置文件 可以修改到/etc/rabbitmq/rabbitmq.config 管理台即可查看 /usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example service rabbitmq-server start --开启RabbitMQ service rabbitmq-server stop --关闭RabbitMQ service rabbitmq-server restart --重启RabbitMQ rabbitmqctl list_ queues --查看队列 rabbitmqctl list_ exchanges -- 查看exchanges rabbitmqctl list_ _users -- 查看用户 rabbitmqctl list_ consumers -- 查看消费者信息 rabbitmqctl environment -- 查看环境变量 rabbitmqctl list. queues name messages_ unacknowledged -- 查看未被确认的队列 rabbitmqctl list_ queues name memory -- 查看单个队列的内存使用 rabbitmqctl list_ queues name messages_ ready -- 查看准备就绪的队列
RabbitMQ的工作模式(简单模式、工作模式、PubSub订阅模式、Routing路由模式、Topics主题模式、PRC模式)
如果没有指定exchange,那么默认采用AMQP defuault
#MQ的发送端代码 public class Producer_send { public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建队列Queue /*String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * 参数: * 1.queue:队列名称 如果没有一个该队列名则会创建一个叫这名字的队列;如果有就不会创建。 * 2.durable:是否持久化:当mq重启后还存在 * 3.exclusive:(1)是否独占,只能有一个消费者监听。(2)当connection关闭是否关闭队列 * 4.autoDelete:是否自动删除,当没有consumer时自动删除 * 5.arguments:参数信息 * */ final String QUEUQ_NAME = "hello,worid"; channel.queueDeclare(QUEUQ_NAME,true,false,false,null); // 6.发送消息 /*String exchange, String routingKey, BasicProperties props, byte[] body * 参数: * 1.exchange:交换机名称。简单模式下会使用默认的"" * 2.routingKey:路由名称 * 3.props:配置信息 * 4.body:字节信息(发送消息信息) * */ String body = "hello second rabbitmq"; channel.basicPublish("",QUEUQ_NAME,null,body.getBytes()); // 7.释放资源 channel.close(); connection.close(); } } #MQ的消费端代码 public class Consumer_accept { public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建队列Queue /*String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * 参数: * 1.queue:队列名称 如果没有一个该队列名则会创建一个叫这名字的队列;如果有就不会创建。 * 2.durable:是否持久化:当mq重启后还存在 * 3.exclusive:(1)是否独占,只能有一个消费者监听。(2)当connection关闭是否关闭队列 * 4.autoDelete:是否自动删除,当没有consumer时自动删除 * 5.arguments:参数信息 * */ final String QUEUQ_NAME = "hello,worid"; channel.queueDeclare(QUEUQ_NAME,true,false,false,null); // 6.接受消息 /*String queue, boolean autoAck, Consumer callback * 1.queue:队列名称 * 2.autoAck:是否自动确认 * 3.callback:回调函数 * */ Consumer consumer = new DefaultConsumer(channel){ /** * 回调函数 接受消息后自动执行 * @param consumerTag:标识 * @param envelope:获取一些信息,交换机、路由key * @param properties:配置信息 * @param body:数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("getExchange:"+envelope.getExchange()); System.out.println("getRoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("msg:"+new String(body)); } }; channel.basicConsume(QUEUQ_NAME,true,consumer); } }
work Queues工作模式: 比简单模式多了一些消费端;多个消费者共同消费同一个队列的消息(竞争)。
应用场景:对于任务过重情况提高处理速度。
代码书写和简单模式一样。
说明:生产者将消息不在发送到队列中而是发给交换机,交换机不具备存储消息能力只负责。将消息转发给消费者或者丢弃。
EXCHANGE:一边接收生产者消息另一边知道如何处理消息。处理消息取决于交换机类型。
1.Faount:广播:将消息传给所有绑定到交换机的队列
2.Diret:定向:将消息传给指定Routing Key的队列
3.Topic:通配符:交给routing pattern(路由协议)的队列
补充: Fanout Exchange:不处理路由键,将队列绑定到交换机上 速度是最快的。
##pubsub订阅模式的生产者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建交换机 /*String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments *参数 * 1.exchange:交换机名称 * 2.type:交换机的类型 * DIRECT("direct"):定向 * FANOUT("fanout"):广播 * TOPIC("topic"):通配符方式 * HEADERS("headers"):参数匹配 * 3.durable:是否持久化 * 4.autoDelete:自动删除 * 5.internal:内部使用:false * 6.arguments:参数 * */ final String EXCHANGE_NAME = "test_fanout"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true,false,false,null); // 6.创建队列 final String QUEUQ_NAME_ONE = "test_faount_queue_one"; final String QUEUQ_NAME_TWO = "test_faount_queue_two"; channel.queueDeclare(QUEUQ_NAME_ONE,true,false,false,null); channel.queueDeclare(QUEUQ_NAME_TWO,true,false,false,null); // 7.绑定交换机和队列的关系 /*String queue, String exchange, String routingKey * 参数 * 1.queue:队列名称 * 2.exchange:交换机名称 * 3.routingKey:路由键:绑定规则 * 如果交换机的类型是FANOUT,那么路由键为空 * */ channel.queueBind(QUEUQ_NAME_ONE,EXCHANGE_NAME,""); channel.queueBind(QUEUQ_NAME_TWO,EXCHANGE_NAME,""); // 8.发送消息 final String body = "这是广播类型交换机信息"; channel.basicPublish(EXCHANGE_NAME,"",null,body.getBytes()); // 9.释放资源 channel.close(); connection.close(); } ##pubsub订阅模式的第一个消费者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建队列Queue final String QUEUQ_NAME_ONE = "test_faount_queue_one"; final String QUEUQ_NAME_TWO = "test_faount_queue_two"; // 6.接受消息 Consumer consumer = new DefaultConsumer(channel) { /** * 回调函数 接受消息后自动执行 * * @param consumerTag:标识 * @param envelope:获取一些信息,交换机、路由key * @param properties:配置信息 * @param body:数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("msg:" + new String(body)); System.out.println("这是第一个队列的信息"); } }; channel.basicConsume(QUEUQ_NAME_ONE, true, consumer); } ###第二个消费者只要监听另一个队列即可
队列和交换机不在是任意绑定而是指定绑定;消息发往exchange时候也需要指定Routingkey;exchange不在是将每一个消息队列而是根据RoutingKey去判断;只有RoutingKey完全一致才可以接受消息。
##routing路由模式的生产者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建交换机 final String EXCHANGE_NAME = "test_direct"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null); // 6.创建队列 final String DIRECT_NAME_ONE = "test_direct_queue_one"; final String DIRECT_NAME_TWO = "test_direct_queue_two"; channel.queueDeclare(DIRECT_NAME_ONE,true,false,false,null); channel.queueDeclare(DIRECT_NAME_TWO,true,false,false,null); // 7.绑定交换机和队列的关系 //队列1的绑定 channel.queueBind(DIRECT_NAME_ONE,EXCHANGE_NAME,"error"); //队列2的绑定 channel.queueBind(DIRECT_NAME_TWO,EXCHANGE_NAME,"info"); channel.queueBind(DIRECT_NAME_TWO,EXCHANGE_NAME,"waring"); channel.queueBind(DIRECT_NAME_TWO,EXCHANGE_NAME,"error"); // 8.发送消息 final String body = "这是路由定向类型交换机信息"; channel.basicPublish(EXCHANGE_NAME,"error",null,body.getBytes()); // 9.释放资源 channel.close(); connection.close(); } ##Routing路由模式的消费者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建队列Queue final String DIRECT_NAME_ONE = "test_direct_queue_one"; final String DIRECT_NAME_TWO = "test_direct_queue_two"; // 6.接受消息 Consumer consumer = new DefaultConsumer(channel) { /** * 回调函数 接受消息后自动执行 * * @param consumerTag:标识 * @param envelope:获取一些信息,交换机、路由key * @param properties:配置信息 * @param body:数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("msg:" + new String(body)); System.out.println("这是Routingkey为error的信息"); } }; channel.basicConsume(DIRECT_NAME_ONE, true, consumer); }
Topic主题模式可以实现PubSub和Routing的功能,只有Topic在配置routingkey时候可以使用通配符,显得更加灵活。( *只能匹配一个词;#可以匹配多个词 )
##Topics通配符的生产者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建交换机 final String EXCHANGE_NAME = "test_topics"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true,false,false,null); // 6.创建队列 final String QUEUQ_NAME_ONE = "test_topics_queue_one"; final String QUEUQ_NAME_TWO = "test_topics_queue_two"; channel.queueDeclare(QUEUQ_NAME_ONE,true,false,false,null); channel.queueDeclare(QUEUQ_NAME_TWO,true,false,false,null); // 7.绑定交换机和队列的关系 channel.queueBind(QUEUQ_NAME_ONE,EXCHANGE_NAME,"#.error"); channel.queueBind(QUEUQ_NAME_ONE,EXCHANGE_NAME,"linux.*"); channel.queueBind(QUEUQ_NAME_TWO,EXCHANGE_NAME,"*.*"); // 8.发送消息 final String body = "这是广播类型交换机信息"; channel.basicPublish(EXCHANGE_NAME,"rabbitmq.linux",null,body.getBytes()); // 9.释放资源 channel.close(); connection.close(); } ##Topics模式的消费者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置参数 connectionFactory.setHost("192.168.20.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/itcast"); connectionFactory.setUsername("charon"); connectionFactory.setPassword("charon"); // 3.创建连接Connection Connection connection = connectionFactory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.创建队列Queue final String QUEUQ_NAME_ONE = "test_topics_queue_one"; final String QUEUQ_NAME_TWO = "test_topics_queue_two"; // 6.接受消息 Consumer consumer = new DefaultConsumer(channel) { /** * 回调函数 接受消息后自动执行 * * @param consumerTag:标识 * @param envelope:获取一些信息,交换机、路由key * @param properties:配置信息 * @param body:数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("msg:" + new String(body)); System.out.println("这是监听#.error和linux.*的信息"); } }; channel.basicConsume(QUEUQ_NAME_ONE, true, consumer); }
生产端配置XML信息如下:在使用时候只需要注入RabbitTemplate对象使用其convertAndSend方法即可。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:properties/rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机 默认交换机类型为direct,名字为:"",路由键为队列的名称 --> <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_one" name="spring_fanout_queue_one" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_two" name="spring_fanout_queue_two" auto-declare="true"/> <!--定义广播类型交换机;并绑定上述两个队列--> <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_one"/> <rabbit:binding queue="spring_fanout_queue_two"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- Routing路由方式--> <!--<rabbit:direct-exchange name="xxx_direct"> <rabbit:bindings> <rabbit:binding queue="xxx_queue" key="RoutingKey" ></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>--> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_one" name="spring_topic_queue_one" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_two" name="spring_topic_queue_two" auto-declare="true"/> <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="charon.*" queue="spring_topic_queue_star"/> <rabbit:binding pattern="charon.#" queue="spring_topic_queue_one"/> <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_two"/> </rabbit:bindings> </rabbit:topic-exchange> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>
消费端配置XML信息如下:在使用时候只需要继承MessageListener接口,重写onMessage方法即可。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:properties/rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <bean id="springQueueListener" class="com.charon.consumer.SpringQueueListener"/> <bean id="fanoutListenerOne" class="com.charon.consumer.FanoutListener"/> <bean id="fanoutListenerTwo" class="com.charon.consumer.FanoutListenerClone"/> <bean id="topicListenerStar" class="com.charon.consumer.TopicListenerStar"/> <bean id="topicListenerOne" class="com.charon.consumer.TopicListener"/> <bean id="topicListenerTwo" class="com.charon.consumer.TopicListenerClone"/> <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"> <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/> <rabbit:listener ref="fanoutListenerOne" queue-names="spring_fanout_queue_one"/> <rabbit:listener ref="fanoutListenerTwo" queue-names="spring_fanout_queue_two"/> <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/> <rabbit:listener ref="topicListenerOne" queue-names="spring_topic_queue_one"/> <rabbit:listener ref="topicListenerTwo" queue-names="spring_topic_queue_two"/> </rabbit:listener-container> </beans>
生产端配置类:
@Configuration public class RabbitMQConfig { public final static String EXCHANGE_NAME = "boot_topic_exchange"; public final static String QUEUE_NAME = "boot_topic_queue"; /** * 1.创建交换机 * @return 交换机 */ @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } /** * 2.创建Queue队列 * @return Queue队列 */ @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } /** * 3.队列和交换机绑定关系 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
消费端只需要添加@RabbitListener注解:
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_topic_queue")
public void messageListener(Message message){
// System.out.println("eeeee");
System.out.println(new String(message.getBody()));
}
}
消息可靠性方法:1.持久化 2.生产确认Confirm3.消费方Ack4.broker高可用。
rabbitmq整个消息投递的路径为:
producer—> rabbitmq broker—> exchange—> queue—> consumer
生产端的可靠性投递:①保证消息的成功发送②保证MQ节点成功接收③发送端收到MQ节点确认答应④完善的消息补偿机制
(1)生产端设有confirmCallback 和returnCalback来确认消息是否投递成功。先在xml配置中配置publisher-confirms和publisher-returns。
●消息从producer到exchange则会返回-个confirmCallback 。
@Autowired private RabbitTemplate rabbitTemplate; /** * 确认模式 * 步骤: * 1.确认模式开启:在connectionFactory设置publisher-confirms="true" * 2.在rabbitTemplate中定义confirmCallback回调函数 */ @Test public void testConfirm(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置信息 * @param ack exchange交换机是否成功收到小心 true:成功;false:失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("producer-> exchange;confirmCallback回调函数被执行"); if(ack){ System.out.println("成功接收到消息"); }else { System.out.println("失败原因:"+cause); } } }); rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","hello confirmCallback"); }
●消息从exchange–> queue投递失败则会返回一一个returnCalback。
/** * 回退模式:当消息发送给exchange后,由exchange到queue失败时候才会执行 * 步骤: * 1.回退模式开启:在connectionFactory设置ppublisher-returns="true" * 2.设置returnCallBack * 3.设置exchange处理消息的模式; * 1.消息没有路由到queue,消息丢弃 * 2.消息没有路由到queue,返回给消息的发送方 */ @Test public void testReturn(){ // 设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); /** * * @param message 消息对象 * @param replyCodse 失败码 * @param replyText 失败信息 * @param exchange 交换机 * @param routingKey 路由键 */ rabbitTemplate.setReturnCallback((Message message,int replyCodse,String replyText,String exchange, String routingKey)->{ System.out.println("exchange -> queue;returnCallback执行了"); System.out.println("消息对象"+message); System.out.println("失败码"+replyCodse); System.out.println("失败信息"+replyText); System.out.println("交换机"+exchange); System.out.println("路由键"+routingKey); }); rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","hello returnCallback"); }
(2)消费端确认机制:xml配置为手动签收并且继承ChannelAwareMessageListener重写onMessage()方法。
/** * @program: rabbitMQ * @description 消费端的消息确认 * Consumer Ack机制: * 1.设置手动签收机制acknowledge="manual" * 1.acknowledge="auto" 自动签收 异常被丢弃 * 2.acknowledge="manual" 手动签收 * 3.acknowledge="none" 根据异常签收 * 2.让监听器实现ChannelAwareMessageListener接口 * 3.消息成功处理后调用channel.basicAck()接受 * 4.消息异常调用channel.basicNack()拒绝签收,让broker重新发送 * @author: charon * @create: 2020-11-19 22:15 **/ @Component public class AckListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String(message.getBody())); //模拟出错 int i = 3/0; // 手动签收 channel.basicAck(deliveryTag,true); }catch (Exception e){ // 第三个参数requeue;重回队列 channel.basicNack(deliveryTag,true,true); //channel.basicReject(deliveryTag,true); } } }
为了防止高峰时期,MQ运行能力处理不过来,我们可以采用消费端限流来使得MQ每次消费固定数量,等消费完再消费下一批来保证消息处理完毕。
/** * @program: rabbitMQ * @description Consumer的限流机制 * 1.确保Ack的机制为手动确认 * 2.listener-container的配置属性 * refetch = "1",表示消费端每次都从mp拉去一条消息,直到手动消息确认完毕,才会接收下一条消息。 * @author: charon * @create: 2020-11-25 21:06 **/ @Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); System.out.println(new String(message.getBody())); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,true); } }
首先定义消息的生命周期
<!--TTL消息的生命周期-->
<rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
<rabbit:queue-arguments>
<!--如果是数字类型一定要设置value-type不然报错
x-message-ttl队列过期时间-->
<entry key="x-message-ttl" value="20000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
TTL:消息的生命周期存在两种情况:1.只在消息上;2.只在队列中。
/** * TTL:消息的生命周期 * 1.队列消息过期 * 设置queue-arguments参数key="x-message-ttl" * 2.消息单独过期 * * 如果设置了消息队列过期时间,也设置了消息单独过期时间。以时间短的为准 * 队列过期后,会将队列中所有消息全部移除 * 消息过期后,只有消息在队列顶端时候,才回去判断其是否过期(移除) */ @Test public void testTTL(){ // 1).队列消息过期 /*for(int i = 0;i<10; i++){ rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","hello TTL"); }*/ // 2).消息单独过期 // 消息后处理对象,设置一些消息的参数 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { /** * * @param message 信息对象 * @return * @throws AmqpException */ @Override public Message postProcessMessage(Message message) throws AmqpException { // 消息过期时间 message.getMessageProperties().setExpiration("5000"); return message; } }; for(int i = 0;i<10; i++){ if(i == 5){ // 过期消息 rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","hello TTL",messagePostProcessor); }else { rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","hello TTL NO"); } } }
死信队列(DLX–Dead Letter Exchange ):当消息成为Dead message可以被重新发送到另一个交换机,这个交换机就是DLX。
RabbitMQ的存在三种情况会使得消息变成死信队列: 1.过期时间、 2.长度限制、3.消息拒收。
死信队列是将正常队列和私和死信交换机绑定,所以我们需要有正常和死信队列、交换机。
<!--死信队列: 1.声明正常队列和交换机 2.声明死信队列的队列和交换机 3.正常队列和私和死信交换机绑定 设置两个参数: x-dead-letter-exchange:死信交换机名称 x-dead-letter-routing-key:死信交换机RoutingKey名称--> <!--1.声明正常队列和交换机--> <rabbit:queue id="test_queue_dlx" name="test_queue_dlx"> <rabbit:queue-arguments> <!--3.1设置死信交换机名称参数--> <entry key="x-dead-letter-exchange" value="exchange_dlx"></entry> <!--3.2设置死信交换机RoutingKey名称参数--> <entry key="x-dead-letter-routing-key" value="dlx.message"></entry> <!--4.1设置队列过期时间--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> <!--4.2设置队列长度限制--> <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test_dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--2.声明死信队列的队列和交换机--> <rabbit:queue id="queue_dlx" name="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
** * 发送测试死信消息: * 1.过期时间 * 2.长度限制 * 3.消息拒收 */ @Test public void testDlx(){ // 1.过期时间 // rabbitTemplate.convertAndSend("test_exchange_dlx","test_dlx.message","message To Dlx"); // 2.长度限制 /*for(int i = 0 ;i<12;i++){ rabbitTemplate.convertAndSend("test_exchange_dlx","test_dlx.message","message_To_Dlx over_limit"); }*/ // 3.消息拒收 rabbitTemplate.convertAndSend("test_exchange_dlx","test_dlx.message","message To Dlx no_consumer"); }
RabbitMQ并没有存在直接实现延投递的方法,但是我们可以利用TTL和死信来实现消息的延迟投递。
<!--延迟队列: 1.声明正常队列和交换机 2.声明死信队列的队列和交换机 3.正常队列和私和死信交换机绑定,设置过期时间--> <rabbit:queue id="message_queue" name="message_queue"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> <entry key="x-dead-letter-exchange" value="message_exchange_dlx"></entry> <entry key="x-dead-letter-routing-key" value="dlx.message.cannel"></entry> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="message_exchange"> <rabbit:bindings> <rabbit:binding pattern="message.#" queue="message_queue"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <rabbit:queue id="message_queue_dlx" name="message_queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="message_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.message.#" queue="message_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
消费端一定要监听死信队列名!
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>-->
<!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
<!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
<!--延迟队列一定要监听死信队列名-->
<rabbit:listener ref="delayListener" queue-names="message_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
(1)IDEA中如何简单实用GIT查看版本控制和分支
(2)代码链接(采用git的版本控制和分支独立): https://github.com/charonry/rabbitMQ
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。