当前位置:   article > 正文

(八)SpringBoot整合RabbitTemplate发送接受消息&序列化机制_rabbittemplate.send

rabbittemplate.send

前言 基于XML配置RabbitMQ

为了方便了解基于SpringBoot配置,我们首先熟悉在传统的Spring基于XML的配置,后续我们会更方便了解基于SpringBoot的配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.        xmlns:context="http://www.springframework.org/schema/context"
  5.        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6.        xsi:schemaLocation=
  7.                "http://www.springframework.org/schema/context
  8.           http://www.springframework.org/schema/context/spring-context.xsd
  9.           http://www.springframework.org/schema/beans
  10.           http://www.springframework.org/schema/beans/spring-beans.xsd
  11.           http://www.springframework.org/schema/rabbit
  12.           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  13.     <context:property-placeholder location="rabbitmq.properties"/>
  14.   <!--扫描 rabbit 包 自动声明交换器、队列、绑定关系-->
  15.     <context:component-scan base-package="com.jetsen.config.rabbit"/>
  16.     <!--1.声明连接工厂,在此我们设置rabbitmq的连接,用户名、密码、虚拟主机等信息-->
  17.     <rabbit:connection-factory id="connectionFactory"
  18.                                addresses="${rabbitmq.addresses}"
  19.                                username="${rabbitmq.username}"
  20.                                password="${rabbitmq.password}"
  21.                                virtual-host="${rabbitmq.virtualhost}"/>
  22.     <!--2.创建一个管理器(org.springframework.amqp.rabbit.core.RabbitAdmin),用于管理交换,队列和绑定。
  23.     auto-startup 指定是否自动声明上下文中的队列,交换和绑定, 默认值为 true。-->
  24.     <rabbit:admin connection-factory="connectionFactory" auto-startup="true" />
  25.    <!-- 3.消息对象json转换类 -->
  26.   <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
  27.     <!--4.声明 template 的时候需要声明 id 不然会抛出异常-->
  28.     <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory  message-converter="jsonMessageConverter" exchange="remoting.exchange"/>
  29.    <!--5.声明 队列-->
  30. <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
  31. exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
  32. auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
  33. <rabbit:queue name="remoting.queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false"/>
  34.   <!--6.声明交换机、队列、绑定管理 -->
  35. <rabbit:direct-exchange name="remoting.exchange" durable="true" auto-delete="false">
  36.         <rabbit:bindings>
  37.             <rabbit:binding queue="remoting.queue" key="remoting.binding"/>
  38.         </rabbit:bindings>
  39.     </rabbit:direct-exchange>
  40.  <!-- 7.定义消费者,需实现implements MessageListener,实现 public void onMessage(Message msg)​​​​​​​方法-->
  41. <bean name="queuehandler" class="com.jetsen.config.rabbitmq.RecvHandler" />
  42. <!-- 配置监听acknowledeg="manual"设置手动应答,它能够保证即使在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP连接断了),也不会丢失消息。
  43. 因为RabbitMQ知道没发送ack确认消息导致这个消息没有被完全处理,将会对这条消息做re-queue处理。如果此时有另一个consumer连接,消息会被重新发送至另一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 -->
  44. <!-- 8.定义消费者监听队列 -->
  45.      <rabbit:listener-container
  46.          connection-factory="connectionFactory">
  47.          <rabbit:listener ref="queuehandler" queues="remoting.queue" />
  48.      </rabbit:listener-container>
  49. </beans>
  1. public class RecvHandler implements MessageListener {
  2. private static final ObjectMapper MAPPER = new ObjectMapper();
  3. public void onMessage(Message msg) {
  4. try {
  5. // msg就是rabbitmq传来的消息,需要的同学自己打印看一眼
  6. // 使用jackson解析
  7. JsonNode jsonData = MAPPER.readTree(msg.getBody());
  8. System.out.println("接收消息:+jsonData.toString());
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }

 当然消费者可以在JAVA代码中实现

  1. @Bean
  2. public Queue remotingQueue() {
  3. // 创建一个持久化的队列 1
  4. return new Queue("remoting.queue", true);
  5. }
  6. @Bean
  7. public SimpleMessageListenerContainer simpleMessageQueueLister(ConnectionFactory connectionFactory) {
  8. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  9. // 设置监听的队列
  10. container.setQueues(remotingQueue());
  11. // 指定要创建的并发使用者数。
  12. container.setConcurrentConsumers(1);
  13. // 设置消费者数量的上限
  14. container.setMaxConcurrentConsumers(5);
  15. // 设置是否自动签收消费 为保证消费被成功消费,建议手工签收
  16. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  17. container.setMessageListener(new ChannelAwareMessageListener() {
  18. @Override
  19. public void onMessage(Message message, Channel channel) throws Exception {
  20. // 可以在这个地方得到消息额外属性
  21. MessageProperties properties = message.getMessageProperties();
  22. //得到消息体内容
  23. byte[] body = message.getBody();
  24. System.out.println(remotingQueue().getName() + "收到消息:" + new String(body));
  25. /*
  26. * DeliveryTag 是一个单调递增的整数
  27. * 第二个参数 代表是否一次签收多条,如果设置为 true,则所有 DeliveryTag 小于该 DeliveryTag 的消息都会被签收
  28. */
  29. channel.basicAck(properties.getDeliveryTag(), false);
  30. }
  31. });
  32. return container;
  33. }

一、在POM文件中添加AMQP的依赖 

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

二、在主类添加@EnableRabbit 注解 

  自动配置

  1.  RabbitAutoConfiguration
  2.  有自动配置了连接工厂ConnectionFactory;
  3. RabbitProperties 封装了 RabbitMQ的配置
  4.  RabbitTemplate :给RabbitMQ发送和接受消息;
  5. AmqpAdmin : RabbitMQ系统管理功能组件;
  6.  AmqpAdmin:创建和删除 Queue,Exchange,Binding
  7.  @EnableRabbit +  @RabbitListener 监听消息队列的内容
  1. @EnableRabbit  //开启基于注解的RabbitMQ模式
  2. @ComponentScan("com.jetsen.mq")
  3. @SpringBootApplication
  4. public class SpringRabbitMqApplication {
  5.     public static void main(String[] args) {
  6.         SpringApplication.run(SpringRabbitMqApplication.class, args);
  7.     }
  8. }

三、配置连接信息

  1. spring.rabbitmq.host=127.0.0.1
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=rabbitadmin
  4. spring.rabbitmq.password=123456
  5. spring.rabbitmq.virtual-host=/vhost_rabbitmq

四、发送消息 

// Message需要自己构造一个;定义消息体内容和消息头
 // rabbitTemplate.send(exchange,routeKey,message);

// object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
// rabbitTemplate.convertAndSend(exchange,routeKey,object);默认采用的是JAVA的序列化

  1. @Override
  2. public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
  3. convertAndSend(exchange, routingKey, object, (CorrelationData) null);
  4. }

五、 接收消息

rabbitTemplate.receiveAndConvert("default.queue")

 

六、序列化机制

    private volatile MessageConverter messageConverter = new SimpleMessageConverter();

    在RabbitMQ中默认采用的序列化

SerializationUtils.deserialize(
                            createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));

     修改序列化机制

  1. @Configuration
  2. public class RabbitMQConfig {
  3. @Bean
  4. public MessageConverter messageConverter() {
  5. return new Jackson2JsonMessageConverter();
  6. }
  7. }

 

七、监听机制@RabbitListener

  1. @Component
  2. @RabbitListener(queues = "default.queue")
  3. public class RabbitMqConsumer {
  4. private static final Logger log= LoggerFactory.getLogger(RabbitMqConsumer.class);
  5. @RabbitHandler
  6. public void process(MessageBean messageBean,Channel channel, Message message) throws IOException {
  7. log.info("HelloReceiver收到 : " + messageBean +"收到时间"+DateUtil.formatDateByFormat(new Date(), "yyyyMMddHHmmss"));
  8. try {
  9. //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
  10. //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
  11. deliveryTag: 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
  12. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  13. log.info("receiver success");
  14. } catch (IOException e) {
  15. log.info("接收到消息之后的处理发生异常.", e);
  16. }
  17. }}

同时注解可以申明交换机、队列和Binding

  1. @RabbitListener(
  2. bindings = @QueueBinding(
  3. value = @Queue(value = "default.queue1", durable = "true"),
  4. exchange = @Exchange(value = "default.exchange1",
  5. durable = "true",
  6. type = "topic",
  7. ignoreDeclarationExceptions = "true"),
  8. key = "default_routing_key.#")
  9. )

申明后会自动创建队列,交换机和Binding

收消息方法可以有两种

1、使用实体 

  1. @RabbitListener(queues = "default.queue")
  2. @RabbitHandler
  3. public void process(MessageBean messageBean, Channel channel, Message message) throws IOException {
  4. log.info("HelloReceiver收到【MessageBean】 : " + messageBean + "收到时间"
  5. + DateUtil.formatDateByFormat(new Date(), "yyyyMMddHHmmss"));
  6. try {
  7. // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
  8. // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
  9. deliveryTag:
  10. // 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  12. log.info("receiver success");
  13. } catch (IOException e) {
  14. log.info("接收到消息之后的处理发生异常.", e);
  15. }
  16. }

2、使用Message

  1. @RabbitListener(queues = "default.queue")
  2. @RabbitHandler
  3. public void process(Channel channel, Message message) throws IOException {
  4. byte[] body = message.getBody();
  5. String messageStr = new String(body,"UTF-8");
  6. log.info("HelloReceiver收到【Message】 : " + messageStr + "收到时间"
  7. + DateUtil.formatDateByFormat(new Date(), "yyyyMMddHHmmss"));
  8. try {
  9. // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
  10. // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
  11. deliveryTag:
  12. // 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  14. log.info("receiver success");
  15. } catch (IOException e) {
  16. log.info("接收到消息之后的处理发生异常.", e);
  17. }
  18. }

 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号