当前位置:   article > 正文

spring整合rabbitMQ最新版_spring-rabbit

spring-rabbit

一、简单对象
1. 依赖
       <!--spring整合rabbitmq-->
       <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

注:maven方式,这一个依赖即可,如果是非maven项目,需要引入5个jar如下:
在这里插入图片描述
推荐使用mavne方式,简单,非Maven项目,先用maven把以来下载本地仓库,复制到非maven的项目中即可。

2. 生产者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <!--生产者者配置如下:-->
    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.vhost}"
                               connection-timeout="${rabbitmq.conTimeout}"
                               publisher-confirms="${rabbitmq.publisher-confirms}"
                               publisher-returns="${rabbitmq.publisher-returns}"/>
    <!-- 管理消息队列 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--此处为配置文件方式 管控台配置模式需要注释  默认模式管控台 Start-->
    <!-- 定义一个队列或者多个队列  自动声明-->
    <rabbit:queue name="Queue-1" auto-declare="true" durable="true"/>

    <rabbit:topic-exchange name="exchange-1">
        <rabbit:bindings>
            <!-- 可绑定多个队列,发送的时候指定key进行发送 -->
            <rabbit:binding queue="Queue-1" pattern="ws.tjqb"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--此处为配置文件方式 管控台配置模式需要注释  默认模式管控台 End-->

    <!-- 定义交换机 自动声明-->
    <rabbit:topic-exchange name="exchange-1"
                           auto-declare="true" durable="true"/>
    <!-- 5. 配置消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    <!-- 定义MQ消息模板
      1. id                 : 定义消息模板ID
      2.connection-factory  : 把定义的连接工厂放到消息模板中
      3.confirm-callback    : confirm确认机制
      4.return-callback     : return确认机制
      5.mandatory           :#有2种状态
                              设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除;
                              设置为 false 后 消费者在消息没有被路由到合适队列情况下会自动删除
    -->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"
                     exchange="exchange-1"
                     confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener"
                     mandatory="true"
                     message-converter="jsonMessageConverter"/>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
package com.gblfy.order.controller;

import com.gblfy.order.pojo.FisCallingTrace;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class Send {
    public static final String EXCHANGE = "exchange-1";
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/test")
    public String test() {
            String uuidStr = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuidStr);
        // 发送消息
        Map<String, String> map = new HashMap<>();
        map.put("email", "550731230@qq.com");
        rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);
        return "success";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
3. 消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <!--消费者配置如下:-->
    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
                               password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"
                               connection-timeout="${rabbitmq.conTimeout}"
                               publisher-confirms="${rabbitmq.publisher-confirms}"
                               publisher-returns="${rabbitmq.publisher-returns}"/>
    <!-- 管理消息队列 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 声明多个消费者对象 -->
    <bean id="emailMessageListener" class="com.gblfy.order.mqhandler.EmailMessageListener"/>
    <!-- 监听队列
      1. connectionFactory 连接工厂
      2. manual 手动签收
      3. ref="" 消费者监听
    -->
    <rabbit:listener-container connection-factory="connectionFactory"
                               acknowledge="manual"
                               concurrency="${rabbitmq.concurrency}"
                               max-concurrency="${rabbitmq.max-concurrency}">
        <rabbit:listener ref="emailMessageListener" method="onMessage" queue-names="Queue-1"/>
    </rabbit:listener-container>
</beans>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
package com.gblfy.order.mqhandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.IOException;
@Component
public class EmailMessageListener implements MessageListener {

  private static final ObjectMapper MAPPER = new ObjectMapper();

  @Override
  public void onMessage(Message message) {

    try {
      JsonNode jsonNode = MAPPER.readTree(message.getBody());
      String email = jsonNode.get("email").asText();
      System.out.println("获取队列中消息:" + email);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
4. 配置文件
#RabbitMQ 连接信息
#IP地址
rabbitmq.host=192.168.0.114
#端口
rabbitmq.port=5672
#用户名
rabbitmq.username=fis
#密码
rabbitmq.password=ncl@1234
#虚拟主机
rabbitmq.vhost=/app/fisMQ
#连接超时时间
rabbitmq.conTimeout=15000
#发送确认 对应RabbitTemplate.ConfirmCallback接口
#消息发送成功 有2个重要参数
# ack 状态为true correlationId 全局唯一ID用于标识每一支队列
rabbitmq.publisher-confirms=true
#发送失败回退,对应RabbitTemplate.ReturnCallback接口
rabbitmq.publisher-returns=true
#默认消费者数量
rabbitmq.concurrency=10
#最大消费者数量
rabbitmq.max-concurrency=20

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
5. spring版本

目前适配的spring版本4.2.3.RELEASE

二、复杂对象

声明:配置文件不变

2.1. 生产者
package com.gblfy.order.controller;

import com.gblfy.order.pojo.FisCallingTrace;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class Send {
    public static final String EXCHANGE = "exchange-1";
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/test")
    public String test() {
        FisCallingTrace f = getFisCallingTrace();

        String uuidStr = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuidStr);
        // 发送消息
        Map<String, Object> map = new HashMap<>();
        map.put("mReqXml", "请求报文");
        map.put("mResXml", "响应报文");
        map.put("mUUID", uuidStr);
        map.put("serviceName", "NYHC");
        map.put("fisCallingTrace", f);

        rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);
        return "success";
    }

    // @RequestMapping("/test")
    // public String test() {
    //     String uuidStr = UUID.randomUUID().toString();
    //     CorrelationData correlationId = new CorrelationData(uuidStr);
    //     // 发送消息
    //     Map<String, String> map = new HashMap<>();
    //     map.put("email", "550731230@qq.com");
    //     rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);
    //     return "success";
    // }
    private FisCallingTrace getFisCallingTrace() {
        FisCallingTrace f = new FisCallingTrace();
        f.setServicename("tjqb");
        f.setServicetype("1");
        f.setInterfacetype("2");
        f.setResstatus("1");
        f.setResremark("纽约数据回传接口");
        f.setReqdate(new Date());
        f.setReqtime("10:00:00");
        f.setResdate(new Date());
        f.setRestime("10:00:00");
        f.setReqxml("请求报文");
        f.setResxml("响应报文");
        return f;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
2.2. 消费者
package com.gblfy.order.mqhandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gblfy.order.pojo.FisCallingTrace;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class EmailMessageListener implements ChannelAwareMessageListener {

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            JsonNode jsonNode = MAPPER.readTree(message.getBody());
            String mReqXml = jsonNode.get("mReqXml").asText();
            String mResXml = jsonNode.get("mResXml").asText();
            String mUUID = jsonNode.get("mUUID").asText();
            String serviceName = jsonNode.get("serviceName").asText();

            System.out.println("获取队列中消息:" + mReqXml);
            System.out.println("获取队列中消息:" + mResXml);
            System.out.println("获取队列中消息:" + mUUID);
            System.out.println("获取队列中消息:" + serviceName);
            JsonNode jsonNode1 = jsonNode.get("fisCallingTrace");

            String jsonStr = MAPPER.writeValueAsString(jsonNode1);
            FisCallingTrace f= MAPPER.readValue(jsonStr , FisCallingTrace.class);
            System.out.println("获取队列中消息:" + f.getReqxml());
            System.out.println("获取队列中消息:" + f.getResxml());

            // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
        log.info("解析操作");
        log.info("落库操作");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/956441
推荐阅读
相关标签
  

闽ICP备14008679号