当前位置:   article > 正文

Spring Boot整合RabbitMQ,监听队列获取消息_rabbitmq,无法从被监听队列获取信息

rabbitmq,无法从被监听队列获取信息

Spring Boot整合RabbitMQ,监听队列获取消息

1、pom.xml添加如下依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2、整合rabbitMQ

  1. 1、首先建MQConn实体类
  2. /**
  3. * rabbitMQ的实体类ip、端口号、用户名、密码
  4. *
  5. **/
  6. public class MQConn {
  7. private String host;
  8. private int port;
  9. private String username;
  10. private String password;
  11. private long connectionTimeout;
  12. private CachingConnectionFactory.CacheMode cacheMode;
  13. private int channelCacheSize;
  14. private long channelCheckoutTimeout;
  15. 省略getter、setter
  16. }
  17. 2、建立配置文件rabbitmq.properties
  18. file.rabbitmq.host=192.168.203.136
  19. file.rabbitmq.port=5672
  20. file.rabbitmq.username=chai
  21. file.rabbitmq.password=123456
  22. file.rabbitmq.connectionTimeout=3000
  23. 3、注入参数RabbitConfig
  24. @Configuration
  25. @PropertySource("classpath:rabbitmq.properties")
  26. public class RabbitMQConfig {
  27. // 注入到实体MQConn
  28. @Bean("mqConn")
  29. @ConfigurationProperties(prefix = "file.rabbitmq")
  30. public MQConn fileWorkRabbitFactory(){
  31. MQConn mqConn=new MQConn();
  32. return mqConn;
  33. }
  34. @Bean("cachingConnectionFactory")
  35. public CachingConnectionFactory connectionFactory(
  36. @Qualifier("mqConn") MQConn mqConnFile
  37. ){
  38. CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
  39. cachingConnectionFactory.setHost(mqConnFile.getHost());
  40. cachingConnectionFactory.setPort(mqConnFile.getPort());
  41. cachingConnectionFactory.setUsername(mqConnFile.getUsername());
  42. cachingConnectionFactory.setPassword(mqConnFile.getPassword());
  43. cachingConnectionFactory.setVirtualHost("/");
  44. cachingConnectionFactory.setPublisherConfirms(true);
  45. if(mqConnFile.getCacheMode()!=null){
  46. cachingConnectionFactory.setCacheMode(mqConnFile.getCacheMode());
  47. }
  48. if(mqConnFile.getChannelCacheSize()>0){
  49. cachingConnectionFactory.setChannelCacheSize(mqConnFile.getChannelCacheSize());
  50. }
  51. if(mqConnFile.getChannelCheckoutTimeout()>0){
  52. cachingConnectionFactory.setChannelCheckoutTimeout(mqConnFile.getChannelCheckoutTimeout());
  53. }
  54. return cachingConnectionFactory;
  55. }
  56. @Bean
  57. @Scope("prototype")
  58. public PackMsgReceiver packMsgReceiver(){
  59. return new PackMsgReceiver();
  60. }
  61. /**
  62. * 创建监听器,监听队列
  63. * @param packMsgReceiver 监听方法
  64. * @return 监听器
  65. */
  66. @Bean
  67. public SimpleMessageListenerContainer messageListenerContainer(PackMsgReceiver packMsgReceiver, @Qualifier("cachingConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
  68. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
  69. // 监听队列的名称
  70. container.setQueueNames("hi");
  71. container.setExposeListenerChannel(true);
  72. // 设置每个消费者获取的最大消息数量
  73. container.setPrefetchCount(100);
  74. // 消费者的个数
  75. container.setConcurrentConsumers(1);
  76. // 设置确认模式为手工确认
  77. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  78. container.setMessageListener(packMsgReceiver);
  79. return container;
  80. }
  81. }
  82. 4、接收消息处理
  83. @Component
  84. public class PackMsgReceiver implements ChannelAwareMessageListener {
  85. private static final Logger logger = LoggerFactory.getLogger(PackMsgReceiver.class);
  86. /**
  87. * @param
  88. * 1、处理成功,这种时候用basicAck确认消息;
  89. * 2、可重试的处理失败,这时候用basicNack将消息重新入列;
  90. * 3、不可重试的处理失败,这时候使用basicNack将消息丢弃。
  91. *
  92. * basicNack(long deliveryTag, boolean multiple, boolean requeue)
  93. * deliveryTag:该消息的index
  94. * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
  95. * requeue:被拒绝的是否重新入队列
  96. */
  97. @Override
  98. public void onMessage(Message message, Channel channel) throws Exception {
  99. byte[] body = message.getBody();
  100. logger.info("接收到消息:" + new String(body));
  101. JSONObject jsonObject = null;
  102. try {
  103. jsonObject = JSONObject.parseObject(new String(body));
  104. if (消费成功) {
  105. logger.info("消息消费成功");
  106. channel.basicAck(message.getMessagePropertites().getDeliveryTag(),false);//确认消息消费成功
  107. }else if(可重试的失败处理){
  108. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  109. } else { //消费失败
  110. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  111. } catch (JSONException e) {
  112. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//消息丢弃
  113. logger.error("This message:" + jsonObject + " conversion JSON error ");
  114. }
  115. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/688498
推荐阅读
相关标签
  

闽ICP备14008679号