赞
踩
.我们为什么要事件总线,对于一个大型的项目这点是很重要的,以为服务之间用MQ的地方很多,每次都要写重复的代码,导致代码的可读性差,而且加大了代码的复杂性。对其进行封装后,这些都可以解决。那么我们开始进行封装吧
2. 首先我们需要spring_boot整合的rabbit MQ包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
public interface EventConstact {
/**
* 创建交换机的名称
*/
String EXCHANGE_NAME="event-exchange";
/**
* 事件类型常量
*/
String EVENT_HOTEL_INSERT="hotel_insert";
}
按理来说你的事件是可以无限增加的,业务需求也是,所以你可以定义很多事件类型
创建交换机
import com.qf.event.constact.EventConstact; import org.springframework.amqp.core.DirectExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //创建交换机 @Configuration public class EventPublishConfiguration { /** *事件发布者需要创建 * 事件接收者也需要创建 * * 创建一个交换机 */ @Bean public DirectExchange getExchange(){ return new DirectExchange(EventConstact.EXCHANGE_NAME,true,false); } }
创建队列。以及交换机,路由键的绑定
@Configuration @ConditionalOnBean(EventListener.class) public class EventConsumerConfiguration { //保证每种服务的name名一样 @Value("${spring.application.name}") public String name; //获得所有EventListener对象 @Autowired public List<EventListener> eventListeners; @Autowired SpringContextUtil springContextUtil; /** * 创建队列(消费者来创建) */ @Bean public Queue getQueue(){ return new Queue(name+"-queue",true,false,false); } /** * 队列和交换机的绑定 * @param getQueue 默认是方法名 * @param getExchange 默认是方法名 * @return */ @Bean public Binding getBinding(Queue getQueue, DirectExchange getExchange){ //循环所有的EventListener实现类 for (EventListener eventListener : eventListeners) { //获得当前处理器需要处理的事件类型-路由键 String eventType = eventListener.gteEventType(); System.out.println("绑定的路由键:"+eventType); //队列通过路由键绑定交换机 Binding binding= BindingBuilder.bind(getQueue).to(getExchange).with(eventType); //动态将binding对象注入spring容器中 springContextUtil.registerBean(eventType+eventListener.hashCode(),binding); } return null; } }
手动把binding对象注册到springioc容器中
import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; import org.springframework.stereotype.Component; import java.util.function.Supplier; /** * Spring -> IOC -> Bean工厂 * Map<beanName, BeanDefintion> * * 手动将Bean注册到IOC容器中 * * Bean -> BeanDefition * */ @Component public class SpringContextUtil implements BeanDefinitionRegistryPostProcessor { //注册bean的核心对象 private BeanDefinitionRegistry beanDefinitionRegistry; /** * 自定义工具方法 - 注册Bean对象 */ public void registerBean(String beanName, Object bean){ //将bean封装成BeanDefinition对象 BeanDefinitionBuilder beanDefition = BeanDefinitionBuilder.genericBeanDefinition(bean.getClass(), new Supplier() { @Override public Object get() { return bean; } }); //将BeanDefintion注册到Spring容器中 beanDefinitionRegistry.registerBeanDefinition(beanName, beanDefition.getBeanDefinition()); } @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { this.beanDefinitionRegistry = registry; } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { } }
发布事件
import com.qf.event.constact.EventConstact; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; //发布者把事件广播出去 @Component public class EventUtil { @Autowired private RabbitTemplate rabbitTemplate; public void publishEvent(String evenType,Object msg){ //第一个参数是交换机的名称,第二是事件名称(类型),第三是事件内容 System.out.println("广播了一个事件:"+evenType); rabbitTemplate.convertAndSend(EventConstact.EXCHANGE_NAME,evenType,msg); } }
下面就是核心方法了,监听方法
//Rabbitmq监听器 @Component @ConditionalOnBean(EventListener.class) public class RabbitMQListener { @Autowired private List<EventListener> eventListeners; /** * 监听指定队列 */ @RabbitListener(queues = "${spring.application.name}-queue") public void msgHandler(Message message){ //获得发布消息的路由键 - 事件类型 String routingKey = message.getMessageProperties().getReceivedRoutingKey(); //交给处EvenListener理器处理 for (EventListener eventListener : eventListeners) { //判断事件类型是否匹配 if (eventListener.gteEventType().equals(routingKey)){ //获得队列中的消息 byte[] body = message.getBody(); //反序列化后用EventListener对象中的evenHandler方法来处理该消息 eventListener.eventHandler(SerializationUtils.deserialize(body)); } } } }
监听者实现这个方法就可以得到发布的事件了
/**
* 接收者必须实现
*/
public interface EventListener<T> {
/**
* 监听的事件类型
*/
String gteEventType();
/**
* 事件处理的方法
*/
void eventHandler(T msg);
}
这里就完成了对MQ事件总线的封装了,喜欢的可以点个赞。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。