当前位置:   article > 正文

2024年最全高可靠性的消息队列 —— RabbitMQ,深入分析解读MySQL锁,解决幻读问题_rabbitmq mysql net

rabbitmq mysql net

技术学习总结

学习技术一定要制定一个明确的学习路线,这样才能高效的学习,不必要做无效功,既浪费时间又得不到什么效率,大家不妨按照我这份路线来学习。

最后面试分享

大家不妨直接在牛客和力扣上多刷题,同时,我也拿了一些面试题跟大家分享,也是从一些大佬那里获得的,大家不妨多刷刷题,为金九银十冲一波!

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

RabbitTemplate rabbitTemplate;

@Transactional

public void send() {

rabbitTemplate.setChannelTransacted(true);

// rabbitTemplate.convertAndSend(…);

// int i = 1 / 0; 没有爆发异常,由spring提交事务,否则回滚(也就是不发生消息)

}

}

但是使用事务有两个问题。

首先channel长时间处于阻塞:发布者必须依次等待broker处理每条消息。

不过有时候发布者只要知道broker宕机时哪些消息尚未处理就足够了。

其次是事务实现的繁重性:每次提交都需要一个 fsync(),这需要很多时间才能完成。

发布 10000 条消息需要 4 多分钟(具体参数机器性能决定,总之确实非常慢)

发送方确认机制


一旦通道进入确认模式,代理将在处理消息时确认消息。

由于这是异步完成的,生产者可以流式发布而不用等待代理,代理也可以有效地批量写入磁盘

  • 原生Java客户端

// 消息追踪记录(如果需要线程安全并且有序,可以使用 ConcurrentSkipListMap )

HashMap<Long, String> map = new HashMap<>();

// 必须显式开启

channel.confirmSelect();

// 监听被退回的消息(如消息路由到队列失败)

channel.addReturnListener(returnMessage -> {

System.out.println("return : " + System.currentTimeMillis());

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

try {

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.err.println(new String(returnMessage.getBody()) + " publish fail!");

});

// 监听被到达或未到达交换机(exchange)的消息

channel.addConfirmListener(new ConfirmListener() {

@Override

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

map.remove(deliveryTag); // 发送成功,缓存清除掉

}

@Override

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.err.println(map.get(deliveryTag) + “not ack!”);

// 下面可以进行重新发送等逻辑

}

});

try {

Random random = new Random();

int idx = 0;

while (idx < 1000) {

String message = “from server…” + (++idx);

// 追踪记录

map.put(channel.getNextPublishSeqNo(), message);

// 发送消息

channel.basicPublish(“”, queue, MessageProperties.PERSISTENT_BASIC, message.getBytes(StandardCharsets.UTF_8));

TimeUnit.MILLISECONDS.sleep(random.nextInt(300) + 200);

}

} finally {

System.out.println(“以下为未成功发送的消息”);

// 可以进行重试逻辑

map.values().forEach(s -> System.out.println("not ack, may need publish again : " + s));

}

  • SpringBoot AMQP

首先在配置文件中配置中开启消息发送方确认机制

spring:

rabbitmq:

publisher-returns: true

publisher-confirm-type: correlated

publisher-confirm-type有三种属性:

  1. none:表示禁用发布确认模式,默认即此。

  2. correlated:使用相关消息确认,回调中触发。

  3. simple:使用 waitForConfirms()waitForConfirmsOrDie() 方法的进行确认。

然后配置回调的监听器

@Configuration

public class PublisherConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

RabbitTemplate rabbitTemplate;

public PublisherConfirmConfig(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

}

//@Bean 此处无需注入

RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {

return new RabbitTransactionManager(connectionFactory);

}

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("correlationData : "+correlationData);

if (ack) {

System.out.println(“success”);

} else {

System.err.println("cause : "+cause);

}

}

@Override

public void returnedMessage(ReturnedMessage returned) {

System.err.println(returned);

}

@PostConstruct

public void initRabbitTemplate() {

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

}

发送消息:

// 注意必须传入 CorrelationData,否则没有根据去跟踪(原生client使用deliveryTag跟踪)

rabbitTemplate.convertAndSend(“q1”, (Object) s, new CorrelationData("correlation id = " + count));

MQ服务器存储消息不丢失

============================================================================

消费者消费消息不丢失

==========================================================================

关于ACK


RabbitMQ的Client

RabbitMQ中channel在消费消息(basicConsume(String queue, boolean autoAck, Consumer callback))的时候,指定的ack的含义如下:

  • autoAck = true

当broker在消息发送后(写入TCP套接字后)此条消息就立即ack了,此条消息RabbitMQ服务器也不再保存了,

而丝毫不管收到消息的客户端是否处理。如果消费者在收到大量消息但没有处理的时候突然宕机了,那么那些未处理消息也就随着本地缓冲区的消失而消失了(服务器上也没有了)。

这种ack方式谨慎使用。

  • autoAck = false

这种ack方式必须要求用户自己主动ack消息(channel.basicAck),常常和prefetchCount配合使用(后面会介绍到)。

Spring AMQP

需要注意的是Spring AMQP中配置的ack的含义和上面的ack含义是不一样的。

  • auto(default)

容器将根据侦听器是正常返回还是抛出异常来发出 ack/nack。

  • none

这里的none和rabbitmq的auto是一个含义。

  • manual

用户必须通过channel去ack/nack

配置方式

  • yaml配置

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: auto

  • 方法级别(覆盖外部配置)

@RabbitListener(queues = “ququeName”, ackMode = “manual”)

Ack的相关api


deliveryTag(交付标签)

写在最后

很多人感叹“学习无用”,实际上之所以产生无用论,是因为自己想要的与自己所学的匹配不上,这也就意味着自己学得远远不够。无论是学习还是工作,都应该有主动性,所以如果拥有大厂梦,那么就要自己努力去实现它。

最后祝愿各位身体健康,顺利拿到心仪的offer!

由于文章的篇幅有限,所以这次的蚂蚁金服和京东面试题答案整理在了PDF文档里

蚂蚁、京东Java岗4面:原理+索引+底层+分布式+优化等,已拿offer

蚂蚁、京东Java岗4面:原理+索引+底层+分布式+优化等,已拿offer

蚂蚁、京东Java岗4面:原理+索引+底层+分布式+优化等,已拿offer

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

着自己学得远远不够。无论是学习还是工作,都应该有主动性,所以如果拥有大厂梦,那么就要自己努力去实现它。

最后祝愿各位身体健康,顺利拿到心仪的offer!

由于文章的篇幅有限,所以这次的蚂蚁金服和京东面试题答案整理在了PDF文档里

[外链图片转存中…(img-ZaNuwa2i-1715125916655)]

[外链图片转存中…(img-DjizDQ3O-1715125916655)]

[外链图片转存中…(img-gfDE7KZu-1715125916656)]

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号