当前位置:   article > 正文

rabbitmq动态创建queue和监听queue_springboot动态创建rabbitmq队列监听

springboot动态创建rabbitmq队列监听

目录

一、pom.xml添加如下依赖

二、整合rabbitmq

(1)在application.properties中添加mq配置

(2)rabbitmqConfig工具类

(3)消费者


一、pom.xml添加如下依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <!-- mq的依赖 -->
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.mybatis.spring.boot</groupId>
  17. <artifactId>mybatis-spring-boot-starter</artifactId>
  18. <version>1.3.2</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>mysql</groupId>
  26. <artifactId>mysql-connector-java</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <optional>true</optional>
  32. </dependency>
  33. <dependency>
  34. <groupId>com.alibaba</groupId>
  35. <artifactId>fastjson</artifactId>
  36. <version>1.2.47</version>
  37. </dependency>

二、整合rabbitmq

(1)在application.properties中添加mq配置

  1. #mq的连接信息,可直接多host连接和单host连接
  2. mq.rabbit.address=192.168.1.1:5672,192.168.1.2:5672
  3. mq.rabbit.virtualHost=/
  4. mq.rabbit.username=guest
  5. mq.rabbit.password=guest
  6. mq.rabbit.exchange.name=mq.direct
  7. #消费者数量
  8. mq.concurrent.consumers=4
  9. #每个消费者获取的最大的消息投递数量
  10. mq.prefetch.count=100

(2)rabbitmqConfig工具类

  1. @Configuration
  2. public class RabbitConfig {
  3. @Value("${mq.rabbit.address}")
  4. String address;
  5. @Value("${mq.rabbit.username}")
  6. String username;
  7. @Value("${mq.rabbit.password}")
  8. String password;
  9. @Value("${mq.rabbit.virtualHost}")
  10. String mqRabbitVirtualHost;
  11. @Value("${mq.rabbit.exchange.name}")
  12. String exchangeName;
  13. @Value("${mq.concurrent.consumers}")
  14. int concurrentConsumers;
  15. @Value("${mq.prefetch.count}")
  16. int prefetchCount;
  17. //创建mq连接
  18. @Bean(name = "connectionFactory")
  19. public ConnectionFactory connectionFactory() {
  20. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  21. connectionFactory.setUsername(username);
  22. connectionFactory.setPassword(password);
  23. connectionFactory.setVirtualHost(mqRabbitVirtualHost);
  24. connectionFactory.setPublisherConfirms(true);
  25. //该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host
  26. connectionFactory.setAddresses(address);
  27. return connectionFactory;
  28. }
  29. @Bean("exchangeName")
  30. public Exchange exchangeName(){
  31. //Map<String,Object> args=new HashMap<>();
  32. //args.put("x-delayed-type","direct");
  33. //return new CustomExchange(exchangeName,"x-delayed-message",true,false,args);
  34. return new DirectExchange(exchangeName,true,false);
  35. }
  36. //监听处理类
  37. @Bean
  38. @Scope("prototype")
  39. public HandleService handleService() {
  40. return new HandleService();
  41. }
  42. //动态创建queue,命名为:hostName.queue1【192.168.1.1.queue1】,并返回数组queue名称
  43. @Bean
  44. public String[] mqMsgQueues() throws AmqpException, IOException {
  45. String[] queueNames = new String[queueSize];
  46. String hostName = OsUtil.getHostNameForLiunx();//获取hostName
  47. for (int i = 1; i <= 10; i++) {
  48. String queueName = String.format("%s.queue%d", hostName, i);
  49. connectionFactory().createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
  50. connectionFactory().createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
  51. queueNames[i - 1] = queueName;
  52. }
  53. return queueNames;
  54. }
  55. //创建监听器,监听队列
  56. @Bean
  57. public SimpleMessageListenerContainer mqMessageContainer(HandleService handleService) throws AmqpException, IOException {
  58. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  59. container.setQueueNames(mqMsgQueues());
  60. container.setExposeListenerChannel(true);
  61. container.setPrefetchCount(prefetchCount);//设置每个消费者获取的最大的消息数量
  62. container.setConcurrentConsumers(concurrentConsumers);//消费者个数
  63. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手工确认
  64. container.setMessageListener(handleService);//监听处理类
  65. return container;
  66. }
  67. }

(3)消费者

  1. @Service
  2. public class HandleService implements ChannelAwareMessageListener {
  3. private static final Logger logger = LoggerFactory.getLogger(HandleService.class);
  4. /**
  5. * @param
  6. * 1、处理成功,这种时候用basicAck确认消息;
  7. * 2、可重试的处理失败,这时候用basicNack将消息重新入列;
  8. * 3、不可重试的处理失败,这时候使用basicNack将消息丢弃。
  9. *
  10. * basicNack(long deliveryTag, boolean multiple, boolean requeue)
  11. * deliveryTag:该消息的index
  12. * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
  13. * requeue:被拒绝的是否重新入队列
  14. */
  15. @Override
  16. public void onMessage(Message message, Channel channel) throws Exception {
  17. byte[] body = message.getBody();
  18. logger.info("接收到消息:" + new String(body));
  19. JSONObject jsonObject = null;
  20. try {
  21. jsonObject = JSONObject.parseObject(new String(body));
  22. if (消费成功) {
  23. logger.info("消息消费成功");
  24. channel.basicAck(message.getMessagePropertites().getDeliveryTag(),false);//确认消息消费成功
  25. }else if(可重试的失败处理){
  26. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  27. } else { //消费失败
  28. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  29. } catch (JSONException e) {
  30. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//消息丢弃
  31. logger.error("This message:" + jsonObject + " conversion JSON error ");
  32. }
  33. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/688496
推荐阅读
相关标签
  

闽ICP备14008679号