当前位置:   article > 正文

SpringBoot项目中连接两个RabbitMq_spring boot rabbitmq异常后连接到新服务器

spring boot rabbitmq异常后连接到新服务器

今天在写项目的时候遇到新需求,一个mq的功能要使用我们公司的服务器的mq,一个mq的功能要使用部署的那边的服务器的mq,话不多说直接上代码。

配置文件application.yml:

  1. spring:
  2. rabbitmq:
  3. yjdpeservice:
  4. host: xxx.xxx.xxx.xxx
  5. port: 5672
  6. username: admin
  7. password: admin
  8. yjservice:
  9. host: xxx.xxx.xxx.xxx
  10. port: 5672
  11. username: admin
  12. password: admin

配置类:

  1. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  2. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.context.annotation.Primary;
  11. @Configuration
  12. public class RabbitPlusConfig {
  13. @Bean(name="mergeConnectionFactory")
  14. @Primary
  15. public ConnectionFactory MergeConnectionFactory(
  16. @Value("${spring.rabbitmq.yjdpeservice.host}") String host,
  17. @Value("${spring.rabbitmq.yjdpeservice.port}") int port,
  18. @Value("${spring.rabbitmq.yjdpeservice.username}") String username,
  19. @Value("${spring.rabbitmq.yjdpeservice.password}") String password
  20. ){
  21. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  22. connectionFactory.setHost(host);
  23. connectionFactory.setPort(port);
  24. connectionFactory.setUsername(username);
  25. connectionFactory.setPassword(password);
  26. return connectionFactory;
  27. }
  28. @Bean(name="LocalConnectionFactory")
  29. public ConnectionFactory LocalConnectionFactory(
  30. @Value("${spring.rabbitmq.yjservice.host}") String host,
  31. @Value("${spring.rabbitmq.yjservice.port}") int port,
  32. @Value("${spring.rabbitmq.yjservice.username}") String username,
  33. @Value("${spring.rabbitmq.yjservice.password}") String password
  34. ){
  35. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  36. connectionFactory.setHost(host);
  37. connectionFactory.setPort(port);
  38. connectionFactory.setUsername(username);
  39. connectionFactory.setPassword(password);
  40. return connectionFactory;
  41. }
  42. @Bean(name="mergeRabbitTemplate")
  43. @Primary
  44. public RabbitTemplate mergeRabbitTemplate(
  45. @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
  46. ){
  47. RabbitTemplate yjdpRabbitTemplate = new RabbitTemplate(connectionFactory);
  48. return yjdpRabbitTemplate;
  49. }
  50. @Bean(name="LocalRabbitTemplate")
  51. public RabbitTemplate LocalRabbitTemplate(
  52. @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
  53. ){
  54. RabbitTemplate yjRabbitTemplate = new RabbitTemplate(connectionFactory);
  55. return yjRabbitTemplate;
  56. }
  57. @Bean(name="mergeFactory")
  58. public SimpleRabbitListenerContainerFactory mergeFactory(
  59. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  60. @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
  61. ) {
  62. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  63. configurer.configure(factory, connectionFactory);
  64. return factory;
  65. }
  66. @Bean(name="LocalFactory")
  67. public SimpleRabbitListenerContainerFactory LocalFactory(
  68. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  69. @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
  70. ) {
  71. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  72. configurer.configure(factory, connectionFactory);
  73. return factory;
  74. }
  75. }

发送消息类:

  1. import io.renren.common.utils.R;
  2. import io.swagger.annotations.Api;
  3. import io.swagger.annotations.ApiOperation;
  4. import org.springframework.amqp.AmqpException;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessagePostProcessor;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Qualifier;
  10. import org.springframework.web.bind.annotation.*;
  11. @Api(tags = "测试双mq发送")
  12. @RestController
  13. @RequestMapping("rbt/mq")
  14. public class RbtMqController {
  15. @Autowired
  16. @Qualifier(value = "mergeRabbitTemplate")
  17. private RabbitTemplate mergerabbitTemplate;
  18. @Autowired
  19. @Qualifier(value = "LocalRabbitTemplate")
  20. private RabbitTemplate LocalrabbitTemplate;
  21. @ApiOperation("测试发送mq")
  22. @PostMapping("/PostMq/{mqone}/{mqtwo}")
  23. public Object PostMq(@RequestParam("token") String token, @PathVariable String mqone, @PathVariable String mqtwo){
  24. mergerabbitTemplate.convertAndSend("CeshiQueue", (Object) mqone, new MessagePostProcessor() {
  25. @Override
  26. public Message postProcessMessage(Message message) throws AmqpException {
  27. long l = 40000;
  28. //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
  29. message.getMessageProperties().setExpiration(String.valueOf(l));
  30. return message;
  31. }
  32. });
  33. String msgTwo = "success";
  34. LocalrabbitTemplate.convertAndSend("CeshiQueue", (Object) mqtwo, new MessagePostProcessor() {
  35. @Override
  36. public Message postProcessMessage(Message message) throws AmqpException {
  37. long l = 40000;
  38. //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
  39. message.getMessageProperties().setExpiration(String.valueOf(l));
  40. return message;
  41. }
  42. });
  43. return R.ok();
  44. }
  45. }

消费者端:

消费LocalFactory对应的mq中的my-dlx-queue-Ceshi

  1. @Component
  2. @RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "LocalFactory")
  3. @Log4j2
  4. public class locallistener {
  5. @RabbitHandler
  6. public void RegularlyAddAsCheckIn(String msg) throws Exception {
  7. log.info(new Date() + "::LocalFactory收到信息::" + msg);
  8. }
  9. }

消费mergeFactoryFactory对应的mq中的my-dlx-queue-Ceshi

  1. @Component
  2. @RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "mergeFactory")
  3. @Log4j2
  4. public class mergerlistener {
  5. @RabbitHandler
  6. public void RegularlyAddAsCheckIn(String msg) throws Exception {
  7. log.info(new Date() + "::mergeFactory收到信息::" + msg);
  8. }
  9. }

完事,实测有效。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/617069
推荐阅读
相关标签
  

闽ICP备14008679号