当前位置:   article > 正文

SpringMVC整合RabbitMQ实现生产 消费_springmvc rabbitmq listener 開關

springmvc rabbitmq listener 開關

1、pom.xml 增加 mq依赖

<spring-rabbit_version>2.0.4.RELEASE</spring-rabbit_version>

<!-- MQ -->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>${spring-rabbit_version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>spring-web</artifactId>
                        <groupId>org.springframework</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、config.properties 引用配置信息

#mq config
mq.url=192.168.1.88
mq.username=guest
mq.password=guest
  • 1
  • 2
  • 3
  • 4

3、创建 spring-rabbitmq.xml

在web.xml中引入

<context-param>
		<param-name>contextConfigLocation</param-name>
		<param-value>
			classpath:spring-rabbitmq.xml
		</param-value>
	</context-param>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生产者:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  http://www.springframework.org/schema/context
 http://www.springframework.org/schema/context/spring-context-2.5.xsd  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:config.properties" />

    <!--配置连接-->
    <rabbit:connection-factory id="connectionFactory" addresses="${mq.url}"
                               username="${mq.username}"
                               password="${mq.password}"
                               requested-heartbeat="60" />
    <!--管理消息队列-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--配置队列名-->
    <rabbit:queue name="update_user_queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false"/>

    <!--配置direct类型exchange 自动创建交换机 持久化-->
    <rabbit:direct-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="update_user_queue" key="update_user_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

rabbitmq的三种模式:direct,fanout,topic 三种 根据自身业务情况选择 配置信息如下:

direct:直连型交换机(消息转换队列 绑定key,意思就是消息与一个特定的路由键匹配,会转发)

<!--配置topic类型exchange 自动创建交换机 持久化-->
<rabbit:direct-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="update_user_queue" key="update_user_key.*.*" />
        </rabbit:bindings>
</rabbit:direct-exchange>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

topic:主题型交换机(发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此)

<!--配置topic类型exchange 自动创建交换机 持久化-->
<rabbit:topic-exchange name="base_user_exchange" auto-declare="true" durable="true">
     <rabbit:bindings>
         <rabbit:binding queue="update_user_queue" pattern="update_user_key.*.*"/>
     </rabbit:bindings>
</rabbit:topic-exchange>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

fanout :广播型交换机(客户端中只要是与该路由绑定在一起的队列都会收到相关消息,这类似广播,发送端不管队列是谁,都由客户端自己去绑定,谁需要数据谁去绑定自己的相应队列)

<rabbit:fanout-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange">  
	    <rabbit:bindings>  
	        <rabbit:binding queue="update_user_queue"/>  
	    </rabbit:bindings>  
</rabbit:fanout-exchange>
  • 1
  • 2
  • 3
  • 4
  • 5

死信队列(满足超时未消费等情况 ,自动移到 死信队列中)

<!--消息失效后监听队列-->
    <rabbit:queue id="delay_queue" durable="true" auto-delete="false" exclusive="false" name="delay_queue">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">60000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="update_user_queue"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

import java.util.HashMap;
import java.util.Map;

public class Constants {
    public static final String MQ_BASE_USER_EXCHANGE = "base_user_exchange";

    public static final String MQ_BASE_USER_MESSAGE_QUENE = "base_user_queue";

    public static final String MQ_BASE_UPDATE_USER_MESSAGE_QUENE = "update_user_queue";

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

发送消息MqServiceImpl

@Service("mqService")
@Slf4j
public class MqServiceImpl{

	@Resource
	private RabbitTemplate rabbitTemplate;

	@Override
	public void sendUpdateUser(UserDTO userDTO) {
		log.info("发送消息为 : " + userDTO);
		rabbitTemplate.convertAndSend(Constants.MQ_BASE_USER_EXCHANGE,Constants.MQ_BASE_UPDATE_USER_MESSAGE_QUENE,userDTO);
	}

	@Override
	public void sendUser(UserDTO userDTO) {
		log.info("发送消息为 : " + userDTO);
		rabbitTemplate.convertAndSend(Constants.MQ_BASE_USER_EXCHANGE,Constants.MQ_BASE_USER_MESSAGE_QUENE,userDTO);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

消费者:

spring-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  http://www.springframework.org/schema/context
 http://www.springframework.org/schema/context/spring-context-2.5.xsd  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:config.properties" />

    <!--配置连接-->
    <rabbit:connection-factory id="connectionFactory" addresses="${mq.url}"
                               username="${mq.username}"
                               password="${mq.password}"
                               requested-heartbeat="60" />
    <!--管理消息队列-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--配置队列名-->
    <rabbit:queue name="update_user_queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false"/>

    <!--配置direct类型exchange 自动创建交换机 持久化-->
    <rabbit:direct-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="update_user_queue" key="update_user_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

   <!--配置监听-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="update_user_queue" ref="updateUserSendHandle" />
    </rabbit:listener-container>

	<bean id="updateUserSendHandle" class="com.mlxc.base_user.mq.UpdateUserSendHandle"></bean>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4、消费者业务处理

UpdateUserSendHandle.java


import com.alibaba.fastjson.JSON;
import com.mlxc.common.dto.base_user.UserDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;

/**
 * @author BennBo
 * @version 1.0
 * @description 修改用户信息mq监听
 * @date 2020/12/22 9:41
 */
@Service("updateUserSendHandle")
public class UpdateUserSendHandle implements MessageListener {

    private static Logger log = LoggerFactory.getLogger(UpdateUserSendHandle.class);

    @Override
    public void onMessage(Message message) {
        UserDTO userDTO = JSON.parseObject(message.getBody(), UserDTO.class);
        System.out.println("UpdateUserSendHandle");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/139121
推荐阅读
相关标签
  

闽ICP备14008679号