赞
踩
消息队列 -- 流量削峰
1.简单模式
2.工作模式
3.发布订阅模式
4.路由模式
5.主题模式
6.RPC模式
第一步发送“半消息”
第二步执行本地事务
第三步对事务消息进行提交或回滚:
在RabbitMQ网站上查看Queues
指定查看消息队列的条数
处理生产者发送消息,消费者在没处理完手里消息的情况继续接收消息
Ready 未读的消息 Unacked 没有确认的消息 Total 服务器端的消息(如图)
- <dependency>
-
- <groupId>org.springframework.boot</groupId>
-
- <artifactId>spring-boot-starter-amqp</artifactId>
-
- </dependency>
- spring:
-
- application:
-
- name:
-
- rabbitmq:
-
- host: 127.0.0.1
-
- username: guest
-
- password: guest
-
- port: 5672
-
- virtual-host: /ems
- @RabbitListener(queuesToDeclare = @Queue( value = “队列名”,durable= “true”[表示是否实现持久化],autoDelete=”true”)) --- 消费者上 写在serviceimpl 类上
-
- @RabbitHandler 代表从队列取出消息时的回调方法,写在方法上 配合上面注解一起使用 .
-
- 注入rabbitTemplate
#表示1个或多个 *代表1个
MQ的应用场景:异步处理(串行方式,并行方式,消息队列)、应用解耦、流量削峰
核心解决问题:当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份。
主备架构-- 主从复制集群
Rabbitmqctl cluster_status --查看rabbitMQ集群状态
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
-
-
- @Component
- public class RabbitMqReceiveHandler {
- private static final Logger log = LoggerFactory.getLogger(RabbitMqReceiveHandler.class);
- private final LogEventSubscribeService logEventSubscribeServiceImpl;
- private final LogApiService logApiServiceImpl;
- private final LogEmailService logEmailServiceImpl;
-
- public RabbitMqReceiveHandler(LogEventSubscribeService logEventSubscribeServiceImpl, LogApiService logApiServiceImpl, LogEmailService logEmailServiceImpl){
- this.logEventSubscribeServiceImpl = logEventSubscribeServiceImpl;
- this.logApiServiceImpl = logApiServiceImpl;
- this.logEmailServiceImpl = logEmailServiceImpl;
- }
-
- //监听队列
- @RabbitListener(queues = {RabbitmqConfig.QUEUE_TESTEVENT})
- public void receive_lenovo(Message message, Channel channel){
- if(logEventSubscribeServiceImpl.handlingSubscribedMessages(message)){
- log.info("消息处理完成....");
- }
- }
- import cn.hutool.json.JSONUtil;
- import com.alibaba.fastjson.JSONObject;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
- import java.util.HashMap;
- import java.util.Map;
-
-
- @Configuration
- public class RabbitmqConfig {
- private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
- public static final String QUEUE_TESTEVENT = "testEvent";
- public static final String TESTEVENT_EXCHANGE="testEvent_Exchange";
- public static final String ROUTINGKEY_TESTEVENT="testEvent_routeKey";
- public static final boolean DURABLE= true; // 是否持久化
- public static final boolean EXCLUSIVE= false; //是否开启当前创建连接
- public static final boolean AUTO_DELETE= false; //是否自动删除
-
- @Autowired
- private ApolloConfig apolloConfig;
- @Value("${rabbit-env}")
- private String rabbitEnv;
- //声明队列
- @Bean(QUEUE_TESTEVENT)
- public Queue testEventQueue() {
- JSONObject rabitCfg = apolloConfig.getPropertyForObject(ApolloEnum.RABIT_CFG.getValue()
- , JSONObject.class);
- log.info("raw mq config:{}", rabitCfg.toJSONString());
- JSONObject configEnv = rabitCfg.getJSONObject(rabbitEnv);
- log.info("Online mq config:{}", configEnv.toJSONString());
- Map<String, Object> map = new HashMap<>(10);
- map.put("x-message-ttl", Integer.parseInt(configEnv.getString(ApolloEnum.TTL.getValue()))*1000);
- return new Queue(QUEUE_TESTEVENT, DURABLE, EXCLUSIVE, AUTO_DELETE, map);
- }
-
- //Direct交换机
- @Bean(TESTEVENT_EXCHANGE)
- public DirectExchange testEventExchange() {
- return new DirectExchange(TESTEVENT_EXCHANGE, DURABLE, AUTO_DELETE);
- }
- //绑定交换机
- @Bean
- public Binding bindingDirect(@Qualifier(QUEUE_TESTEVENT) Queue queue,
- @Qualifier(TESTEVENT_EXCHANGE) Exchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_TESTEVENT).noargs();
- }
-
- @Bean
- public CachingConnectionFactory cachingConnectionFactory(){
- CachingConnectionFactory factory = new CachingConnectionFactory();
- log.info("开始获取rabbitmq连接信息");
- RabbitConfigVm rabbitConfig = this.getRabitCfgFromApollo();
- log.info("rabbitmq连接信息:{}", JSONUtil.toJsonStr(rabbitConfig));
- factory.setHost(rabbitConfig.getHost().trim());
- factory.setPort(rabbitConfig.getPort());
- factory.setUsername(rabbitConfig.getUsername().trim());
- factory.setPassword(rabbitConfig.getPassword().trim());
- factory.setVirtualHost("/");
- return factory;
- }
-
- private RabbitConfigVm getRabitCfgFromApollo(){
- JSONObject rabitCfg = apolloConfig.getPropertyForObject(ApolloEnum.RABIT_CFG.getValue()
- , JSONObject.class);
- log.info("raw rabbitmq连接信息:{}", rabbitCfg.toJSONString());
- RabbitConfigVm rabbitConfigVm = new RabbitConfigVm();
- log.info("****rabbitmq env:{}", rabbitEnv);
- JSONObject configEnv = rabitCfg.getJSONObject(rabbitEnv);
- rabbitConfigVm.setHost(configEnv.getString(ApolloEnum.HOST_NAME.getValue()));
- rabbitConfigVm.setPort(Integer.parseInt(configEnv.getString(ApolloEnum.PORT.getValue())));
- rabbitConfigVm.setUsername(configEnv.getString(ApolloEnum.USER_NAME.getValue()));
- rabbitConfigVm.setPassword(configEnv.getString(ApolloEnum.PASSWORD.getValue()));
- log.info("****apollo rabbit config**:{}", rabbitConfigVm.toString());
- return rabbitConfigVm;
- }
- @Component
- public class RabbitMQProducerUtils {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
-
- // public RabbitMQProducerUtils(RabbitTemplate rabbitTemplate){
- // this.rabbitTemplate = rabbitTemplate;
- // }
-
- //向队列里抛消息
- public void sendMessage(String paramsStr){
- rabbitTemplate.convertAndSend(RabbitmqConfig.TESTEVENT_EXCHANGE, RabbitmqConfig.ROUTINGKEY_TESTEVENT, paramsStr);
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。