赞
踩
1、pom.xml 增加 mq依赖
<spring-rabbit_version>2.0.4.RELEASE</spring-rabbit_version>
<!-- MQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit_version}</version>
<exclusions>
<exclusion>
<artifactId>spring-web</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
2、config.properties 引用配置信息
#mq config
mq.url=192.168.1.88
mq.username=guest
mq.password=guest
3、创建 spring-rabbitmq.xml
在web.xml中引入
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
classpath:spring-rabbitmq.xml
</param-value>
</context-param>
生产者:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:config.properties" /> <!--配置连接--> <rabbit:connection-factory id="connectionFactory" addresses="${mq.url}" username="${mq.username}" password="${mq.password}" requested-heartbeat="60" /> <!--管理消息队列--> <rabbit:admin connection-factory="connectionFactory" /> <!--配置队列名--> <rabbit:queue name="update_user_queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false"/> <!--配置direct类型exchange 自动创建交换机 持久化--> <rabbit:direct-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange"> <rabbit:bindings> <rabbit:binding queue="update_user_queue" key="update_user_key" /> </rabbit:bindings> </rabbit:direct-exchange> </beans>
rabbitmq的三种模式:direct,fanout,topic 三种 根据自身业务情况选择 配置信息如下:
direct:直连型交换机(消息转换队列 绑定key,意思就是消息与一个特定的路由键匹配,会转发)
<!--配置topic类型exchange 自动创建交换机 持久化-->
<rabbit:direct-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange">
<rabbit:bindings>
<rabbit:binding queue="update_user_queue" key="update_user_key.*.*" />
</rabbit:bindings>
</rabbit:direct-exchange>
topic:主题型交换机(发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此)
<!--配置topic类型exchange 自动创建交换机 持久化-->
<rabbit:topic-exchange name="base_user_exchange" auto-declare="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="update_user_queue" pattern="update_user_key.*.*"/>
</rabbit:bindings>
</rabbit:topic-exchange>
fanout :广播型交换机(客户端中只要是与该路由绑定在一起的队列都会收到相关消息,这类似广播,发送端不管队列是谁,都由客户端自己去绑定,谁需要数据谁去绑定自己的相应队列)
<rabbit:fanout-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange">
<rabbit:bindings>
<rabbit:binding queue="update_user_queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
死信队列(满足超时未消费等情况 ,自动移到 死信队列中)
<!--消息失效后监听队列-->
<rabbit:queue id="delay_queue" durable="true" auto-delete="false" exclusive="false" name="delay_queue">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">60000</value>
</entry>
<entry key="x-dead-letter-exchange" value="update_user_queue"/>
</rabbit:queue-arguments>
</rabbit:queue>
import java.util.HashMap;
import java.util.Map;
public class Constants {
public static final String MQ_BASE_USER_EXCHANGE = "base_user_exchange";
public static final String MQ_BASE_USER_MESSAGE_QUENE = "base_user_queue";
public static final String MQ_BASE_UPDATE_USER_MESSAGE_QUENE = "update_user_queue";
}
发送消息MqServiceImpl
@Service("mqService") @Slf4j public class MqServiceImpl{ @Resource private RabbitTemplate rabbitTemplate; @Override public void sendUpdateUser(UserDTO userDTO) { log.info("发送消息为 : " + userDTO); rabbitTemplate.convertAndSend(Constants.MQ_BASE_USER_EXCHANGE,Constants.MQ_BASE_UPDATE_USER_MESSAGE_QUENE,userDTO); } @Override public void sendUser(UserDTO userDTO) { log.info("发送消息为 : " + userDTO); rabbitTemplate.convertAndSend(Constants.MQ_BASE_USER_EXCHANGE,Constants.MQ_BASE_USER_MESSAGE_QUENE,userDTO); } }
消费者:
spring-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:config.properties" /> <!--配置连接--> <rabbit:connection-factory id="connectionFactory" addresses="${mq.url}" username="${mq.username}" password="${mq.password}" requested-heartbeat="60" /> <!--管理消息队列--> <rabbit:admin connection-factory="connectionFactory" /> <!--配置队列名--> <rabbit:queue name="update_user_queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false"/> <!--配置direct类型exchange 自动创建交换机 持久化--> <rabbit:direct-exchange name="base_user_exchange" durable="true" auto-delete="false" id="base_user_exchange"> <rabbit:bindings> <rabbit:binding queue="update_user_queue" key="update_user_key" /> </rabbit:bindings> </rabbit:direct-exchange> <!--配置监听--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="update_user_queue" ref="updateUserSendHandle" /> </rabbit:listener-container> <bean id="updateUserSendHandle" class="com.mlxc.base_user.mq.UpdateUserSendHandle"></bean> </beans>
4、消费者业务处理
UpdateUserSendHandle.java
import com.alibaba.fastjson.JSON; import com.mlxc.common.dto.base_user.UserDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * @author BennBo * @version 1.0 * @description 修改用户信息mq监听 * @date 2020/12/22 9:41 */ @Service("updateUserSendHandle") public class UpdateUserSendHandle implements MessageListener { private static Logger log = LoggerFactory.getLogger(UpdateUserSendHandle.class); @Override public void onMessage(Message message) { UserDTO userDTO = JSON.parseObject(message.getBody(), UserDTO.class); System.out.println("UpdateUserSendHandle"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。