当前位置:   article > 正文

springboot整合rabbitMQ及手动ack确认_rocketmq手动回复ack

rocketmq手动回复ack

springboot版2.2.2

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.ex</groupId>
  7. <artifactId>tmap2</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <parent>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-parent</artifactId>
  12. <version>2.2.2.RELEASE</version>
  13. <relativePath/> <!-- lookup parent from repository -->
  14. </parent>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-web</artifactId>
  19. </dependency>
  20. <!-- <dependency>-->
  21. <!-- <groupId>io.agora.rtm</groupId>-->
  22. <!-- <artifactId>agora-rtm-sdk</artifactId>-->
  23. <!-- <version>1.3</version>-->
  24. <!-- </dependency>-->
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-amqp</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.projectlombok</groupId>
  31. <artifactId>lombok</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-test</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. </dependencies>
  39. </project>

application.yml
 

  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. username: admin
  6. password: 123456
  7. virtual-host: /ems
  8. template:
  9. retry: #重试,消息发送失败会重试
  10. enabled: true # 开启重试
  11. initial-interval: 10000ms #第一次十秒重试
  12. max-interval: 80000ms #最后一次是八秒重试
  13. multiplier: 2 #重试翻倍率
  14. publisher-confirms: true #发送者开启 confirm 确认机制
  15. publisher-returns: true # 发送者开启 return 确认机制




RabbitCallbackConfig.java

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Slf4j
  9. @Configuration
  10. public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  11. @Bean
  12. RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  13. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  14. //rabbitTemplate发送消息json转换配置
  15. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  16. rabbitTemplate.setMandatory(true);
  17. rabbitTemplate.setConfirmCallback(this);
  18. rabbitTemplate.setReturnCallback(this);
  19. return rabbitTemplate;
  20. }
  21. /**
  22. * 配置接收消息json转换为对象
  23. * @return
  24. */
  25. @Bean
  26. public MessageConverter jsonMessageConverter(){
  27. return new Jackson2JsonMessageConverter();
  28. }
  29. // 下边这样写也可以
  30. // @Autowired
  31. // private RabbitTemplate rabbitTemplate;
  32. // @PostConstruct
  33. // public void init() {
  34. // rabbitTemplate.setMandatory(true);
  35. // rabbitTemplate.setReturnCallback(this);
  36. // rabbitTemplate.setConfirmCallback(this);
  37. // }
  38. @Override
  39. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  40. if (!ack) {
  41. log.error("confirm==>发送到broker.exchange失败\r\n" +
  42. "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
  43. correlationData, ack, cause);
  44. } else {
  45. log.info("confirm==>发送到broker.exchange成功\r\n" +
  46. "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
  47. correlationData, ack, cause);
  48. }
  49. }
  50. @Override
  51. public void returnedMessage(Message message, int replyCode, String replyText,
  52. String exchange, String routingKey) {
  53. log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
  54. "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
  55. message, replyCode, replyText, exchange, routingKey);
  56. }
  57. }

WorkCustomer.java

  1. import com.rabbitmq.client.Channel;
  2. import lombok.SneakyThrows;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.Exchange;
  5. import org.springframework.amqp.rabbit.annotation.Queue;
  6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.support.AmqpHeaders;
  9. import org.springframework.messaging.handler.annotation.Header;
  10. import org.springframework.stereotype.Component;
  11. import java.io.IOException;
  12. @Slf4j
  13. @Component
  14. public class WorkCustomer {
  15. @RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true"),ackMode ="MANUAL" )
  16. @SneakyThrows
  17. public void receive12(Student student, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
  18. try {
  19. log.info("message: {}",student);
  20. // 处理实际业务
  21. // 制造异常
  22. // int wrongNumber = 1/0;
  23. // 无异常,确认消息消费成功
  24. channel.basicAck(deliveryTag, true);
  25. }catch (IOException | ArithmeticException exception) {
  26. log.error("处理消息发生异常", exception);
  27. // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
  28. channel.basicNack(deliveryTag, true, true);
  29. }
  30. }
  31. //-----------------工作模式---------------------------------------
  32. // 生产端没有指定交换机只有routingKey和Object。
  33. //消费方产生work队列,放在默认的交换机(AMQP default)上。
  34. //而默认的交换机有一个特点,只要你的routerKey的名字与这个
  35. //交换机的队列有相同的名字,他就会自动路由上。
  36. //生产端routingKey 叫work ,消费端生产work队列。
  37. //他们就路由上了
  38. @RabbitListener(queuesToDeclare = @Queue("work"))
  39. public void receive1(String message){
  40. System.out.println("work message1 = " + message);
  41. }
  42. @RabbitListener(queuesToDeclare = @Queue("work"))
  43. public void receive2(String message){
  44. System.out.println("work message2 = " + message);
  45. }
  46. //-------------------------广播模式--------------------------------------------------
  47. @RabbitListener(bindings = @QueueBinding(
  48. value = @Queue, // 创建临时队列
  49. exchange = @Exchange(name = "logs", type = "fanout")
  50. ))
  51. public void fanout1(String message) {
  52. System.out.println("message1 = " + message);
  53. }
  54. @RabbitListener(bindings = @QueueBinding(
  55. value = @Queue, // 创建临时队列
  56. exchange = @Exchange(name = "logs", type = "fanout")
  57. ))
  58. public void fanout2(String message) {
  59. System.out.println("message2 = " + message);
  60. }
  61. //-------------------------路由模式--------------------------------------------------
  62. @RabbitListener(bindings = {
  63. @QueueBinding(
  64. value = @Queue(value = "infoQue",declare = "true"), // 创建info队列,declare默认队列持久化
  65. key = {"info"}, // 路由key
  66. exchange = @Exchange(type = "direct", name = "directs")
  67. )})
  68. public void receive1221(String message) {
  69. System.out.println("message1 = " + message);
  70. }
  71. @RabbitListener(bindings = {
  72. @QueueBinding(
  73. value = @Queue(value = "info||error"), // 创建临时队列
  74. key = {"info", "error"}, // 路由key
  75. exchange = @Exchange(type = "direct", name = "directs")
  76. )})
  77. public void receive11(String message) {
  78. System.out.println("message1 = " + message);
  79. }
  80. @RabbitListener(bindings = {
  81. @QueueBinding(
  82. value = @Queue,
  83. key = {"error"},
  84. exchange = @Exchange(type = "direct", name = "directs")
  85. )})
  86. public void receive22(String message) {
  87. System.out.println("message2 = " + message);
  88. }
  89. //-------------------------Topic 订阅模型(动态路由模型)--------------------------------------------------
  90. @RabbitListener(bindings = {
  91. @QueueBinding(
  92. value = @Queue,
  93. key = {"user.*"},
  94. exchange = @Exchange(type = "topic",name = "topics")
  95. )
  96. })
  97. public void receive111(String message){
  98. System.out.println("message1 = " + message);
  99. }
  100. @RabbitListener(bindings = {
  101. @QueueBinding(
  102. value = @Queue,
  103. key = {"user.#"},
  104. exchange = @Exchange(type = "topic",name = "topics")
  105. )
  106. })
  107. public void receive222(String message){
  108. System.out.println("message2 = " + message);
  109. }
  110. /**
  111. * 监听Queue的时候,直接获取消息体.
  112. * 在注解上开启手动确认, 必须是ackMode的大写.
  113. * 在进行消息确认的时候,要带上Rabbit MQ Server发送过来头上的tag,可以通过@Header注解获取delivery tag,
  114. * @param firstTopicQueueMessage 消息体
  115. * @param channel Broker和Consumer建立的channel
  116. * @param tag 消息头中的tag
  117. */
  118. // @RabbitListener(queues = "${queue.topic.first}", ackMode = "MANUAL")
  119. // @RabbitHandler
  120. // @SneakyThrows
  121. // public void receiveFirstTopicQueueMessage(String firstTopicQueueMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  122. // log.info("This is firstTopicQueue received message: {}", firstTopicQueueMessage);
  123. // try {
  124. // // 处理实际业务
  125. // TimeUnit.SECONDS.sleep(5);
  126. // // 制造异常
  127. // // int wrongNumber = 1/0;
  128. // // 无异常,确认消息消费成功
  129. // channel.basicAck(tag, true);
  130. // }catch (IOException | ArithmeticException exception) {
  131. // log.error("处理消息发生异常", exception);
  132. // // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
  133. // channel.basicNack(tag, true, true);
  134. // }
  135. // }
  136. }

TTest.java

  1. import org.junit.Test;
  2. import org.junit.runner.RunWith;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. @RunWith(SpringRunner.class)
  8. @SpringBootTest
  9. public class TTest {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. //5.2 第一种hello world模型使用
  13. @Test
  14. public void test1() {
  15. Student student = new Student();
  16. student.setName("小明");
  17. student.setAge("18");
  18. student.setAddress("杭州");
  19. rabbitTemplate.convertAndSend("hello", student);
  20. // 生产端没有指定交换机只有routingKey和Object。
  21. //消费方产生hello队列,放在默认的交换机(AMQP default)上。
  22. //而默认的交换机有一个特点,只要你的routerKey的名字与这个
  23. //交换机的队列有相同的名字,他就会自动路由上。
  24. //生产端routingKey 叫hello ,消费端生产hello队列。
  25. //他们就路由上了
  26. }
  27. //5.3 第二种work模型使用
  28. @Test
  29. public void test2() {
  30. for (int i = 0; i < 10; i++) {
  31. rabbitTemplate.convertAndSend("work", "hello work!");
  32. // 生产端没有指定交换机只有routingKey和Object。
  33. //消费方产生work队列,放在默认的交换机(AMQP default)上。
  34. //而默认的交换机有一个特点,只要你的routerKey的名字与这个
  35. //交换机的队列有相同的名字,他就会自动路由上。
  36. //生产端routingKey 叫work ,消费端生产work队列。
  37. //他们就路由上了
  38. }
  39. }
  40. // 5.4 Fanout 广播模型
  41. @Test
  42. public void test3() {
  43. rabbitTemplate.convertAndSend("logs", "", "这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容
  44. }
  45. //5.5 Route 路由模型
  46. @Test
  47. public void contextLoads() {
  48. // rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
  49. rabbitTemplate.convertAndSend("directs", "info", "info 的日志信息");
  50. }
  51. //5.6 Topic 订阅模型(动态路由模型)
  52. @Test
  53. public void contextLoads1() {
  54. rabbitTemplate.convertAndSend("topics", "user.save.findAll", "user.save.findAll 的消息");
  55. }
  56. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/127113
推荐阅读
相关标签
  

闽ICP备14008679号