当前位置:   article > 正文

Spring cloud集成Rabbitmq_springcloud整合rabbitmq

springcloud整合rabbitmq

1、配置pom

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>2.2.0.RELEASE</version>
  5. </dependency>

2、yml配置

spring:
   #rabbit mq
   rabbitmq:

     host: 192.168.1.146
     port: 5672
     username: guest
     password: guest

3、连接、交换器、队列等设置

  1. @Configuration
  2. public class RabbitMqConfig {
  3. /**
  4. * 组件发布EXCHANGE
  5. */
  6. public static final String EXCHANGE_COMPONENT_PUBLISHED="exchange.component.published";
  7. public static final String EXCHANGE_COMPONENT_SYNCED="exchange.component.synced";
  8. public static final String EXCHANGE_EXTRACTION_TASK="exchange.extraction.task";
  9. /**
  10. * 组件发布消息队列
  11. */
  12. public static final String QUEUE_COMPONENT_PUBLISHED="queue.component.published";
  13. public static final String QUEUE_COMPONENT_SYNCED="queue.component.synced";
  14. public static final String QUEUE_EXTRACTION_TASK="queue.extraction.task";
  15. public static final String ROUTING_KEY_="";
  16. @Value("${spring.rabbitmq.host}")
  17. private String host;
  18. @Value("${spring.rabbitmq.port}")
  19. private int port;
  20. @Value("${spring.rabbitmq.username}")
  21. private String username;
  22. @Value("${spring.rabbitmq.password}")
  23. private String password;
  24. @Bean
  25. public ConnectionFactory connectionFactory() {
  26. CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
  27. connectionFactory.setUsername(username);
  28. connectionFactory.setPassword(password);
  29. connectionFactory.setVirtualHost("/");
  30. connectionFactory.setPublisherConfirms(true);
  31. return connectionFactory;
  32. }
  33. @Bean
  34. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  35. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  36. factory.setConnectionFactory(connectionFactory);
  37. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  38. return factory;
  39. }
  40. /**
  41. *
  42. * @return
  43. */
  44. @Bean
  45. public FanoutExchange exchangeComponentPublished()
  46. {
  47. return new FanoutExchange(EXCHANGE_COMPONENT_PUBLISHED);
  48. }
  49. @Bean
  50. public Queue queueComponentPublished()
  51. {
  52. return new Queue(QUEUE_COMPONENT_PUBLISHED,false);
  53. }
  54. @Bean
  55. public FanoutExchange exchangeComponentSynced()
  56. {
  57. return new FanoutExchange(EXCHANGE_COMPONENT_SYNCED);
  58. }
  59. @Bean
  60. public FanoutExchange exchangeExtractionTask()
  61. {
  62. return new FanoutExchange(EXCHANGE_EXTRACTION_TASK);
  63. }
  64. @Bean
  65. public Queue queueComponentSynced()
  66. {
  67. return new Queue(QUEUE_COMPONENT_SYNCED,false);
  68. }
  69. @Bean
  70. public Queue queueExtractionTask()
  71. {
  72. return new Queue(QUEUE_EXTRACTION_TASK,false);
  73. }
  74. @Bean
  75. public Binding componentSyncedBinding(){
  76. return BindingBuilder.bind(queueComponentSynced()).to(exchangeComponentSynced());
  77. }
  78. @Bean
  79. public Binding extractionTaskBinding(){
  80. return BindingBuilder.bind(queueExtractionTask()).to(exchangeExtractionTask());
  81. }
  82. @Bean
  83. public RabbitTemplate rabbitTemplate(ConnectionFactory factory){
  84. RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
  85. return rabbitTemplate;
  86. }
  87. }

需要定义交换器,队列,绑定,会自动注册。

4、生产者

  1. @Component
  2. public class ExtractionTaskMessageSender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(ExtractionTaskMessage msg)
  6. {
  7. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  8. ObjectMapper mapper = new ObjectMapper();
  9. Message m = null;
  10. try {
  11. m = MessageBuilder.withBody(mapper.writeValueAsBytes(msg)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
  12. } catch (JsonProcessingException e) {
  13. e.printStackTrace();
  14. }
  15. rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_EXTRACTION_TASK,"",msg);
  16. }
  17. }

使用RabbitTemplate发送消息。

5、消费者

  1. @Component
  2. @RabbitListener(queues = RabbitMqConfig.QUEUE_EXTRACTION_TASK)
  3. public class ExtractionTaskMessageReceiver {
  4. private static Logger logger = LoggerFactory.getLogger(ExtractionTaskMessageReceiver.class);
  5. @Autowired
  6. private DataExtractorFactory dataExtractorFactory;
  7. @Autowired
  8. private ExtractionTaskService extractionTaskService;
  9. @Autowired
  10. private ExtractionDataService extractionDataService;
  11. @RabbitHandler
  12. public void process(@Payload ExtractionTaskMessage msg) {
  13. try {
  14. } catch (DataExtractionException e)
  15. {
  16. logger.error( "",e.getMessage());
  17. }
  18. catch (Exception e) {
  19. //异常处理
  20. logger.error(e.getMessage());
  21. }
  22. }
  23. }
使用 @RabbitListener 进行监听,@RabbitHandler定义消息处理方法。

在进行监听时,会查询所有@RabbitHandler注解的消息处理方法,如果没有参数类型匹配的方法,则异常。

@RabbitListener 注意

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

    • application/octet-stream:二进制字节数组存储,使用 byte[]
    • application/x-java-serialized-object:java 对象序列化格式存储。使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
    • text/plain:文本数据类型存储。使用 String
    • application/json:JSON 格式。使用 Object、相应类型

注意:@RabbitListener注解在类上或方法上,行为不一样。在类上,消费方法参数类型不可以设置为Message。在方法上,方法类型可以为Message

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
  • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
  • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

设置MessageConvert

Json格式

  1. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  2. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  3. factory.setConnectionFactory(connectionFactory);
  4. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  5. return factory;
  6. }

 自定义

  1. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  2. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  3. factory.setConnectionFactory(connectionFactory);
  4. factory.setMessageConverter(new MessageConverter() {
  5. @Override
  6. public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  7. return null;
  8. }
  9. @Override
  10. public Object fromMessage(Message message) throws MessageConversionException {
  11. try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){
  12. return (User)ois.readObject();
  13. }catch (Exception e){
  14. e.printStackTrace();
  15. return null;
  16. }
  17. }
  18. });
  19. return factory;
  20. }

@Payload 与 @Headers

  • 使用 @Payload 和 @Headers 注解可以消息中的 body 与 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body:"+body);
    System.out.println("Headers:"+headers);
}

  • 也可以获取单个 Header 属性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body:"+body);
    System.out.println("token:"+token);
}
 

通过 @RabbitListener 注解声明 Binding

  • 通过 @RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常)
  1. @RabbitListener(bindings = @QueueBinding(
  2. exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
  3. value = @Queue(value = "consumer_queue",durable = "true"),
  4. key = "key.#"
  5. ))
  6. public void processMessage1(Message message) {
  7. System.out.println(message);
  8. }

@RabbitListener 和 @RabbitHandler 搭配使用

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
  1. @Component
  2. @RabbitListener(queues = "consumer_queue")
  3. public class Receiver {
  4. @RabbitHandler
  5. public void processMessage1(String message) {
  6. System.out.println(message);
  7. }
  8. @RabbitHandler
  9. public void processMessage2(byte[] message) {
  10. System.out.println(new String(message));
  11. }
  12. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/925932
推荐阅读
相关标签
  

闽ICP备14008679号