赞
踩
RabbitMQ的高级特性:
RabbitMQ提供了两种模式帮助我们解决消息可靠性投递:
①Confirm消息确认模式机制:消息确认模式,是指生产者发送消息给broker,然后broker会给生产者一个响应,生产者接受应答,用来确定该消息是否正常发送到broker,这种模式也是可靠性投递的核心保障。
Confirm模式确认消息的代码实现:
1)创建一个配置文件application.yml
spring:
rabbitmq:
host: rabbitmq服务所在的ip地址
#设置消息确认的类型为手动确认
publisher-confirm-type: correlated
2)创建你的producer生产者测试代码:
//注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void test2(){ //设置消息确认Confirm模式 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //消息确认ack默认为true,如果为fasle,则提示生产者继续发送消息 if (ack==false){ System.out.println("继续发现消息"); } } }); for (int i = 0; i < 10; i++) { //生产者发送给broker的消息 rabbitTemplate.convertAndSend("lp_exchange_direct","error","你好"); } }
②Return返回消息模式机制:Return Listener用于处理一些不可路由的消息。消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作,但是某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可到达的消息,就需要使用Ruturn Listener!
Return模式消息返回的代码实现:
1)创建一个application.yml配置文件
spring:
rabbitmq:
host: rabbitmq服务所在的IP地址
#表示开启消息返回模式
publisher-returns:true
2)代码实现:
//注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void test1(){ //设置ReturnsCallback消息返回模式 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { //returnedMessage 包含了一些消息,回复代码,回复文本等参数 System.out.println("消息失败的原因是"+returnedMessage.getReplyText()); } }); //发送的消息 rabbitTemplate.convertAndSend("lp_exchange_direct","error2","hello"); }
Consumer ACK,表示消费端收到消息后的确认方式,其中自动确认是指,当消息一旦被Consumer接收到后,就会自动确认,并将相对应的message从RabbitMQ的消息队列中移除。但是在实际的业务处理中,有些消息可能接收到,但是业务处理出现了问题,那么该消息就会消失。如果设置了手动方式,则需要在业务处理成功后,调用channel.basicAck(),手动确认,如果出现异常,则调用channel.basicNack()方法,让消息自动重新发送。
Consumer ACK的代码实现:
1)创建一个application.yml配置文件
Spring:
rabbitmq:
host: rabbitmq所在的IP地址
listener:
simple:
#表示手动确认
acknowledge-mode: manual
2)代码实现:
@Component public class MyListener { @RabbitListener(queues = "lp_routing02") public void listener(Message message, Channel channel)throws Exception{ long deliveryTag = message.getMessageProperties().getDeliveryTag(); byte[] body = message.getBody(); String msg=new String(body); System.out.println(msg); try { // long deliveryTag 表示标识 // boolean multiple 是否允许多确认 channel.basicAck(deliveryTag,true); } catch (Exception e) { //long deliveryTag 表示标识 // boolean multiple 是否允许多确认 //boolean requeue 是否让队列再次发送该消息。 channel.basicNack(deliveryTag,true,false); } } }
①为什么对消费端限流
当数据量特别大的时候,我们对生产端限流肯定是不正确的,因为有时候并发量很大,有时候并发量又很小,我们就会无法约束生产段,所以我们就需要对消费端限流,用于保持消费端的稳定当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
②如何对消费端进行限流呢?
1.将确认设置为手动确认
2.配置限流的格式
1)创建一个application.yml配置文件
Spring:
rabbitmq:
host: rabbitmq服务的IP地址
listener:
simple:
#配置确认方式为手动
acknowledge-mode: manual
#配置每次消费的个数
prefetch: 10
2)代码实现:
@Component public class MyListener { @RabbitListener(queues = "lp_routing02") public void listener(Message message, Channel channel)throws Exception{ long deliveryTag = message.getMessageProperties().getDeliveryTag(); byte[] body = message.getBody(); String msg=new String(body); System.out.println(msg); try { // long deliveryTag 表示标识 // boolean multiple 是否允许多确认 //channel.basicAck(deliveryTag,true); } catch (Exception e) { //long deliveryTag 表示标识 // boolean multiple 是否允许多确认 //boolean requeue 是否让队列再次发送该消息。 channel.basicNack(deliveryTag,true,false); } } }
TTL,Time To Live的缩写,生存时间的意思。RabbitMQ支持消息的过期时间,在发送消息时可以指定;RabbitMQ也支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,那么消息就会自动的清除。
①消息的TTL
我们在生产端发送消息的时候可以在properties中指定expiration
属性来对消息过期时间设置,单位为毫秒(ms);
/** * deliverMode 设置为 2 的时候代表持久化消息 * expiration 意思是设置消息的有效期,超过10秒没有被消费者接收后会被自动删除 * headers 自定义的一些属性 * */ //5. 发送 Map<String, Object> headers = new HashMap<String, Object>(); headers.put("myhead1", "111"); headers.put("myhead2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("100000") .headers(headers) .build(); String msg = "test message"; channel.basicPublish("", queueName, properties, msg.getBytes());
我们也可以在图形化页面中进入Exchange
发送指定的expiration
②队列的TTL
我们可以在图形化页面中新增一个queue
,创建时可以设置ttl,对于队列中超过该时间的消息将会被移除
@Configuration public class RabbitConfig { private final static String EXCHANEG_NAME="exchange_lipeng"; private final static String QUEUE_NAME="queue_lipeng"; @Bean public Exchange getExchange(){ Exchange build = ExchangeBuilder.fanoutExchange(EXCHANEG_NAME).durable(true).build(); return build; } @Bean public Queue getQueue(){ Queue build = QueueBuilder.durable(QUEUE_NAME).build(); return build; } @Bean public Binding getBinding(Queue queue ,Exchange exchange){ Binding info = BindingBuilder.bind(queue).to(exchange).with("").noargs(); return info; } }
死信队列,缩写DLX(Dead Letter Exchange)死信交换机,当消息成为Dead Message后,可以被重新发送给到另一个交换机.
消息成为死信的情况:
1,队列消息长度到达限制
2,消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放入原目标列,requeue=false;
3,原队列存在消息过期设置,消息到达时间未被消费
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange和x-dead-letter-routing-key
代码实现:
①配置类代码
@Configuration public class RabbitConfig { private final String EXCHANGE="exchange001"; private final String DEAD_EXCHANGE="dead_exchange001"; private final String QUEUE="queue001"; private final String DEAD_QUEUE="dead_queue00"; @Bean public Queue queue(){ return QueueBuilder .durable(QUEUE) .withArgument("x-message-ttl",20000) .withArgument("x-max-length",10) .withArgument("x-dead-letter-exchange",DEAD_EXCHANGE) .withArgument("x-dead-letter-routing-key","error") .build(); } @Bean public Exchange exchange(){ return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build(); } @Bean public Queue dead_queue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Exchange dead_exchange(){ return ExchangeBuilder.directExchange(EXCHANGE).build(); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with("error").noargs(); } @Bean public Binding dead_binding(){ return BindingBuilder.bind(dead_exchange()).to(dead_exchange()).with("error").noargs(); } }
②启动类代码
@SpringBootApplication
public class SpringBootRabbit01Application {
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbit01Application.class, args);
}
}
③发送消息测试类
@SpringBootTest
class SpringBootRabbit01ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("exchange001","error","你好");
}
}
效果图:
如何保证消息幂等性?
①让每一个消息都携带一个全局的唯一ID
1)消费者获取消息后先根据id去查询redis是否存在该消息
2)如果不存在,则正常消费,消费完毕后写入redis
3)如果存在,则证明消息被消费过,直接丢弃
代码实现:
@Component @RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true")) public class Consumer { @RabbitHandler public void receiveMessage(Message message) throws Exception { Jedis jedis = new Jedis("localhost", 6379); String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(),"UTF-8"); System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId); String messageIdRedis = jedis.get("messageId"); if(messageId == messageIdRedis){ return; } JSONObject jsonObject = JSONObject.parseObject(msg); String email = jsonObject.getString("message"); jedis.set("messageId",messageId); } }
延迟队列,是指消息进入队列后不会立即被消费,而是只有到达指定事件后,才会被消费
注意:
在rabbitmq中并没有提供延迟队列的功能,可以使用TTL+死信队列组合实现延迟队列的效果。
1)代码实现
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange",
"order.msg","订单信息:id=1,time=2019年");
//2.打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
2)
①延迟队列是指消息进入队列后可以被延迟一段时间,然后再被消费,
②rabbitmq中并没延迟队列的功能,可以使用TTL和DLX实现延迟队列的效果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。