当前位置:   article > 正文

rabbitmq 消息队列_rabbitmq 普通队列死信队列

rabbitmq 普通队列死信队列

一、MQ 的选择

1.Kafka

Kaika 主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。尚硅谷官网kafka视频连接http://www.gulixueyuan.com/course/ 330/tasks

2.RocketMQ

RocketMQ天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削 峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

3.RabbitMQ

RabbitMQ结合 erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

二、Rabbitmq的核心:

生产者,交换机,队列,消费者

三、手动确认

消息接收后,为手动确认,该条消息会回到队列中

四、持久化

队列持久化和消息持久化,虽然持久化,也会丢失,需要在发布确认中优化

消息丢失:会在保存到磁盘的那一瞬间丢失,

五、Rabbitmq的分发模式

1、默认为轮询分发

channel.basicQos(0); 或者不设置,即为默认为轮询分发

2、不公平分发

channel.basicQos(1);

3、预取值

channel.basicQos(5); 预取5条,只能保证一开始的预取

六、发布确认策略

构造rabbitmq连接

  1. /**
  2. * 创建rabbitmq连接
  3. * @return
  4. */
  5. public static Channel getChannel() {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. // factory.setHost("147.23.23.33");
  8. factory.setHost("172.16.75.388");
  9. factory.setPort(5672);
  10. factory.setUsername("admin");
  11. // factory.setPassword("123456");
  12. factory.setPassword("admin");
  13. Channel channel = null;
  14. try{
  15. Connection connection = factory.newConnection();
  16. channel = connection.createChannel();
  17. }catch (Exception e){
  18. System.out.println("e.getMessage() = " + e.getMessage());
  19. }
  20. return channel;
  21. }

1、单条发布确认

  1. // 1、单个确认
  2. public static void publishSingleConfirm() throws Exception {
  3. Channel channel = RabbitMqUtil.getChannel();
  4. channel.queueDeclare("hello",true,false,false,null);
  5. //开启发布确认
  6. channel.confirmSelect();
  7. //开始时间
  8. long start = System.currentTimeMillis();
  9. for (Integer i = 0; i < MESSAGE_COUNT; i++) {
  10. String msg = i+"";
  11. channel.basicPublish("","hello",null,msg.getBytes());
  12. //消息发布确认
  13. boolean flag = channel.waitForConfirms();
  14. if(flag){
  15. System.out.println( i + "--消息确认成功");
  16. }
  17. }
  18. //结束时间
  19. long end = System.currentTimeMillis();
  20. System.out.println("发布200条,单条发布确认用时:" + (end - start)+"毫秒");
  21. }

2、批量发布确认

  1. //2、批量发布确认
  2. public static void publishBatchConfirm() throws Exception {
  3. Channel channel = RabbitMqUtil.getChannel();
  4. channel.queueDeclare("hello",true,false,false,null);
  5. //开启发布确认
  6. channel.confirmSelect();
  7. //开始时间
  8. long start = System.currentTimeMillis();
  9. for (Integer i = 0; i < MESSAGE_COUNT; i++) {
  10. String msg = i+"";
  11. channel.basicPublish("","hello",null,msg.getBytes());
  12. //消息批量发布确认
  13. if(MESSAGE_COUNT==i){
  14. channel.waitForConfirms();
  15. System.out.println( i + "--消息确认成功");
  16. }
  17. }
  18. //结束时间
  19. long end = System.currentTimeMillis();
  20. System.out.println("发布200条,批量发布确认用时:" + (end - start)+"毫秒");
  21. }

3、异步发布确认

  1. // 3、异步确认
  2. public static void publishAsyncConfirm() throws Exception {
  3. Channel channel = RabbitMqUtil.getChannel();
  4. channel.queueDeclare("hello",true,false,false,null);
  5. //开启发布确认
  6. channel.confirmSelect();
  7. /**
  8. * 声明一个线程安全有序的哈希表,适用于高并发的情况下
  9. * 1、可以轻松的将序号和数据进行关联
  10. * 2、轻松批量删除
  11. * 3、支持高并发(多线程)
  12. */
  13. ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
  14. //发布成功的消息的回调
  15. ConfirmCallback ackConfirm = ( deliveryTag, multiple)->{
  16. //拿到发布成功的消息
  17. ConcurrentNavigableMap<Long, String> concurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag);
  18. //从队列中删除发布成功的,剩下的即为发布不成功的
  19. concurrentSkipListMap.remove(deliveryTag);
  20. System.out.println("确认消息的标记-->" + deliveryTag);
  21. };
  22. //发送失败的消息的回调
  23. ConfirmCallback nackConfirm = ( deliveryTag, multiple)->{
  24. //拿到为发布成功的消息
  25. String msg = concurrentSkipListMap.get(deliveryTag);
  26. System.out.println("未发布成功的消息:" + msg);
  27. };
  28. //监听broker
  29. channel.addConfirmListener(ackConfirm,nackConfirm);
  30. //开始时间
  31. long start = System.currentTimeMillis();
  32. for (Integer i = 0; i < MESSAGE_COUNT; i++) {
  33. String msg = i+"";
  34. channel.basicPublish("","hello",null,msg.getBytes());
  35. /**
  36. * 将发送的消息放到 concurrentSkipListMap
  37. * concurrentSkipListMap的 key 是从channel中拿到的序列号
  38. * value 是需要发送的 msg
  39. */
  40. concurrentSkipListMap.put(channel.getNextPublishSeqNo(),msg);
  41. }
  42. //结束时间
  43. long end = System.currentTimeMillis();
  44. System.out.println("发布200条,异步发布确认用时:" + (end - start)+"毫秒");
  45. }

注:rabbitmq保证消息不丢失的措施:

1、队列持久化
2、消息持久化
3、发布确认(单条发布确认,批量发布确认,异步发布确认)

七、交换机 exchange

交换机类型:

1、fanout:发布订阅模式

广播模式,队列通过路由key将其和交换机绑定关系,生产者只需将消息发送到交换机,路由key便会将消息路由到对应的队列中,从而供消费者进行消费

2、direct: 路由模式

根据routingkey与队列的绑定关系,交换机将消息路由到对应的队列

3、topic 主题模式

该模式的routingkey不可以乱写,必须是一个单词列表,用点号隔开,例如:stu.test.author,该种单词列表最多不能超过255个字节,在该模式中,有两种替换符:
*:可以代替一个单词
#:可以代替零个或者多个单词

4、headers:已不常用

八、死信队列

1、产生原因:

1)消息TTL过期,即为消息存放时间过期
2)队列达到最大长度(队列存满了,无法再添加信息到mq)
3)消息被拒绝(basic.reject或basic.nack),并且reque=false

九、延迟队列

实现方式:

1、使用死信队列

  1. /**
  2. * 死信队列的延迟队列 配置类
  3. */
  4. @Configuration
  5. public class TtlQueueConfig {
  6. //普通交换机
  7. public static final String normal_exchange = "normal_exchange";
  8. //死信交换机
  9. public static final String death_exchange = "death_exchange";
  10. //普通队列A
  11. public static final String a_queue = "a_queue";
  12. //普通队列B
  13. public static final String b_queue = "b_queue";
  14. public static final String c_queue = "c_queue";
  15. //死信队列
  16. public static final String death_queue = "death_queue";
  17. @Bean("normal_exchange")
  18. public DirectExchange normalExchange(){
  19. return new DirectExchange(normal_exchange);
  20. }
  21. @Bean("death_exchange")
  22. public DirectExchange deathExchange(){
  23. return new DirectExchange(death_exchange);
  24. }
  25. //声明普通队列 ttl为10s
  26. @Bean("a_queue")
  27. public Queue aQueue(){
  28. Map<String,Object> arguments = new HashMap<>(3);
  29. //设置消息过期时间 ,消息发布者也可以设置消息过期时间,单位为ms
  30. arguments.put("x-message-ttl",10000);
  31. //设置正常队列的死信交换机
  32. arguments.put("x-dead-letter-exchange",death_exchange);
  33. //设置routing-key
  34. arguments.put("x-dead-letter-routing-key","Y");
  35. return QueueBuilder.durable(a_queue).withArguments(arguments).build();
  36. }
  37. //声明普通队列 ttl为40s
  38. @Bean("b_queue")
  39. public Queue bQueue(){
  40. Map<String,Object> arguments = new HashMap<>(3);
  41. //设置消息过期时间 ,消息发布者也可以设置消息过期时间,单位为ms
  42. arguments.put("x-message-ttl",40000);
  43. //设置正常队列的死信交换机
  44. arguments.put("x-dead-letter-exchange",death_exchange);
  45. //设置routing-key
  46. arguments.put("x-dead-letter-routing-key","Y");
  47. return QueueBuilder.durable(b_queue).withArguments(arguments).build();
  48. }
  49. //声明普通队列 消息发布者设置过期时间
  50. @Bean("c_queue")
  51. public Queue cQueue(){
  52. Map<String,Object> arguments = new HashMap<>(2);
  53. //设置正常队列的死信交换机
  54. arguments.put("x-dead-letter-exchange",death_exchange);
  55. //设置routing-key
  56. arguments.put("x-dead-letter-routing-key","Y");
  57. return QueueBuilder.durable(c_queue).withArguments(arguments).build();
  58. }
  59. //声明死信队列
  60. @Bean("death_queue")
  61. public Queue deathQueue(){
  62. return QueueBuilder.durable(death_queue).build();
  63. }
  64. //普通交换机与队列进行绑定
  65. @Bean
  66. public Binding aQueueBingDingNormalExchange(@Qualifier("a_queue")Queue queueA,
  67. @Qualifier("normal_exchange") DirectExchange exchange){
  68. return BindingBuilder.bind(queueA).to(exchange).with("XA");
  69. }
  70. @Bean
  71. public Binding bQueueBingDingNormalExchange(@Qualifier("b_queue")Queue queueB,
  72. @Qualifier("normal_exchange") DirectExchange exchange){
  73. return BindingBuilder.bind(queueB).to(exchange).with("XB");
  74. }
  75. @Bean
  76. public Binding cQueueBingDingNormalExchange(@Qualifier("c_queue")Queue queueB,
  77. @Qualifier("normal_exchange") DirectExchange exchange){
  78. return BindingBuilder.bind(queueB).to(exchange).with("XC");
  79. }
  80. //死信队列与交换机绑定
  81. @Bean
  82. public Binding deathQueueBingDingNormalExchange(@Qualifier("death_queue")Queue deathQueue,
  83. @Qualifier("death_exchange") DirectExchange exchange){
  84. return BindingBuilder.bind(deathQueue).to(exchange).with("Y");
  85. }
  86. }
  87. //发布消息:
  88. /**
  89. * 发送消息到延迟队列
  90. * @param message
  91. */
  92. @GetMapping("send/test")
  93. public void sendMsg(String message){
  94. rabbitTemplate.convertAndSend("normal_exchange","XA",message);
  95. rabbitTemplate.convertAndSend("normal_exchange","XB",message);
  96. System.out.println("发送成功");
  97. }

2、使用rabbitmq延迟中间件

  1. /**
  2. * 基于rabbitmq-delayed-message-exchange 插件实现的延迟队列
  3. */
  4. @Configuration
  5. public class DelayQueueConfig {
  6. //延迟交换机
  7. public static final String delayed_exchange = "delayed_exchange";
  8. //延迟队列
  9. public static final String delayed_queue = "delayed_queue";
  10. //延迟routing_key
  11. public static final String delayed_routing_key = "delayed_routing_key";
  12. //声明交换机 自定义x-delayed-exchange
  13. @Bean
  14. public CustomExchange customExchange(){
  15. /**
  16. * 交换机名称
  17. * 交换机类型
  18. * 是否持久化
  19. * 是否自动删除
  20. * 交换机参数
  21. */
  22. Map<String,Object> map = new HashMap<>();
  23. map.put("x-delayed-type","direct");
  24. return new CustomExchange(delayed_exchange,"x-delayed-message",true,false,map);
  25. }
  26. //声明队列
  27. @Bean
  28. public Queue delayedQueue(){
  29. return new Queue(delayed_queue);
  30. }
  31. //绑定交换机和队列
  32. @Bean
  33. public Binding delayedBingDingDelayedQueue(@Qualifier("delayedQueue") Queue queue ,
  34. @Qualifier("customExchange") CustomExchange exchange){
  35. return BindingBuilder.bind(queue).to(exchange).with(delayed_routing_key).noargs();
  36. }
  37. }

使用死信的缺点

即为死信队列,生产者发送消息时设定过期时间的话,如果第一条消息的过期时间很长,第二条消息的过期时间很短,rabbitmq会等第一条消息过期后丢入到死信队列,然后再检查第二条消息的过期时间,从而导致过期时间短的消息晚被丢入到死信队列

解决方案:

1、使用rabbitmq的延迟插件:

rabbitmq-delayed-message-exchange

2、插件下载地址:

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

3、将下载下来的.ez文件cp到plugins文件夹下,

4、开启该插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

5、重启rabbitmq

6、查看是否安装成功

如果交换机type有x-delayed-message表示安装成功

十、消息确认

1、交换机确认消息回调

2、消息为达到目的地,进行回退

  1. /**
  2. * 交换机确认消息的回调
  3. * 交换机确认回调时 ,需要在配置文件中添加
  4. * spring.rabbitmq.publisher-confirm-type: correlated 默认为 none 不开启
  5. * 消息回退回调
  6. * spring.rabbitmq.publisher-returns: true
  7. */
  8. @Slf4j
  9. @Component
  10. public class ConfirmConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. /**
  14. * 由于RabbitTemplate.ConfirmCallback 是rabbitmqTemplate的内部方法,所以需要初始化
  15. */
  16. @PostConstruct //该注解是在其他注解加载完成后才使用的
  17. public void init(){
  18. rabbitTemplate.setConfirmCallback(this);
  19. rabbitTemplate.setReturnCallback(this);
  20. }
  21. /**
  22. * 参数说明
  23. *
  24. * @param correlationData 发送的消息 这个是从消息发布者那里发布消息到交换机时添加的
  25. * @param b 交换机是否接收到消息
  26. * @param s 失败的原因
  27. */
  28. @Override
  29. public void confirm(CorrelationData correlationData, boolean b, String s) {
  30. String id = correlationData != null ? correlationData.getId() : "";
  31. if (b) {
  32. log.info("交换机已接收到id为:{}的消息", id);
  33. }else{
  34. log.info("交换机未接收到id为:{}的消息",id);
  35. }
  36. }
  37. /**
  38. * 回退消息,只有消息没有到达目的地的时候会触发
  39. * @param
  40. */
  41. @Override
  42. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  43. log.error("回退消息:{}",message);
  44. }
  45. }

十一、备份交换机

正常声明交换机时,指定备份交换机,当消息不能正常到达正常交换机时,会将消息发送到备份交换机

  1. @Bean("normal_exchange")
  2. public DirectExchange deathExchange(){
  3. return ExchangeBuilder.directExchange(normal_exchange).withArgument("alternate","bakup_exchange").build();
  4. }

十二、rabbitmq的幂等性

原因:由于网络原因,导致用户重复消费,从而产生问题
解决方案:最优使用redis的原子性

十三、优先级队列

1、先设置优先级队列

  1. Map<String,Object> map = new HashMap<>();
  2. map.put("x-max-priority",10);//官方允许范围0-255
  3. channel.queueDeclare("test_queue", true, false, false, map);

2、生产者发送消息时,给消息设置优先级:

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
  2. channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

3、消费者

消费者拿到消息后会先对消息进行优先级排队,然后在进行消费

十四、惰性队列

1、惰性队列的消息保存在内存中还是磁盘中呢

正常情况下:保存在内存中
惰性队列:会将消息保存到磁盘上

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号