赞
踩
使用mq发送邮件的优点在于:
这里我们以用户注册后需要同时发送邮件和短信这个场景做为示例,流程图如下所示。
不介绍rabbitMQ的基础信息了,直接进入代码环节。
pom文件中引入相关依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <!-- 版本继承springboot -->
- </dependency>
application.yml文件配置
- # rabbitmq
- spring:
- rabbitmq:
- port: 5672
- host: localhost
- username: guest
- password: guest
- virtual-host: /
- publisher-returns: true #开启生产者手动确认
- publisher-confirm-type: correlated #消息确认类型
做完这些就已经成功将rabbitMQ引入到Springboot中了,接下来是生产者中的配置类,这里使用的推送方式是topic
topic交换器是指按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的BindingKey进行模糊匹配,如果匹配成功,将消息分发到该Queue。 Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding
Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class LoginRabbitConfig {
-
- private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机";
-
- // 声明交换机 topic
- @Bean
- public TopicExchange topicExchange() {
- // 是否持久化、是否自动删除
- return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false);
- }
-
- }

模拟一下注册的接口,因为注册后需要发送邮件和短信提醒用户,如果按照平时的顺序调用不仅耗时长,并且一旦邮件或短信发送失败没有进行异常处理话会导致注册失败,因此采用消息队列能够很好的解决这一问题,代码中UserPOJO只是定义的一个实体类
- import com.alibaba.fastjson.JSON;
- import gwc.mq.pojo.UserPOJO;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.UUID;
-
- @RestController
- public class LoginController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @RequestMapping("/register")
- public String register(UserPOJO userPOJO) {
- Object msg = JSON.toJSONString(userPOJO);
- // 设置ConfirmCallback
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (ack) {
- // 消息发送成功
- System.out.println("消息发送成功, correlationData: " + correlationData.getReturnedMessage());
- } else {
- // 消息发送失败
- System.out.println("消息发送失败, cause: " + cause);
- }
- });
- try {
- rabbitTemplate.convertAndSend("测试用Topic交换机", "*", msg, new CorrelationData(UUID.randomUUID().toString()));
- } catch (Exception e) {
- e.printStackTrace();
- // 失败处理 无论出现那种情况都将错误消息存到redis中 然后用定时任务统一发送
- }
-
-
- return "用户-" + userPOJO.getName() + "-注册成功!";
- }
-
- }

在配置类中,我们打开了消息发送方的消息确认机制,因此在这里我们需要setConfirmCallback函数,其中correlationData是具体的消息,ack表示是否发送成功,cause则是失败的具体原因。
发送方发送失败的原因有三种可归为
(1)producter连接mq失败,消息没有发送到mq
(2)producter连接mq成功,但是发送到exchange失败
(3)消息发送到exchange成功,但是路由到queue失败
无论出现哪一种异常,我们都可以通过try catch来进行错误消息的处理,我采用的是捕获到错误后将消息存入db中(redis),再通过springboot的定时任务进行统一的重发,存入db代码就不再描述。
application.yml文件配置
- #rabbitMQ
- spring:
- rabbitmq:
- port: 5672
- host: localhost
- username: guest
- password: guest
- virtual-host: /
- listener:
- simple:
- acknowledge-mode: manual #开启手动确认机制
邮件系统的配置类
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MailRabbitConfig {
- private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机";
-
- private static final String QUEUE_NAME = "email_queue";
-
- private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机";
-
- private static final String ERR_QUEUE_NAME = "err_email_queue";
-
- // 声明交换机 topic
- @Bean
- public TopicExchange topicExchange() {
- // 是否持久化、是否自动删除
- return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false);
- }
-
- // 声明队列
- @Bean
- public Queue queue() {
- // 是否持久化、是否当前连接对象独占、是否自动删除
- return new Queue(QUEUE_NAME, true, false, false);
- }
-
- // 声明绑定关系
- @Bean
- public Binding queueBinding(Queue queue, TopicExchange topicExchange) {
- return BindingBuilder.bind(queue).to(topicExchange).with("*.mail");
- }
-
-
- // 声明死信交换机 direct
- @Bean
- public DirectExchange directExchange() {
- // 是否持久化、是否自动删除
- return new DirectExchange(ERR_EXCHANGE_NAME_DIRECT, true, false);
- }
-
- // 声明死信队列
- @Bean
- public Queue errQueue() {
- // 是否持久化、是否当前连接对象独占、是否自动删除
- return new Queue(ERR_QUEUE_NAME, true, false, false);
- }
-
- // 声明绑定关系
- @Bean
- public Binding errQueueBinding(@Qualifier("errQueue")Queue errQueue, @Qualifier("directExchange")DirectExchange directExchange) {
- return BindingBuilder.bind(errQueue).to(directExchange).with("err.mail");
- }
-
-
- }

邮件发送服务进行监听
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.Channel;
- import gwc.mq.pojo.UserPOJO;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
-
- // 死信队列
- // 消息出现问题后则会进入死信交换机,然后进入死信队列
- // 建立一个消费者根据routingkey监听死信队列 即可处理不同的死信队列中的数据
-
- @Component
- public class MailListener {
-
- private static final int MAX_RETRY = 3;
- private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机";
-
- private static final String ERR_QUEUE_NAME = "err_email_queue";
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Autowired
- private TopicExchange topicExchange;
-
- @Autowired
- private AmqpAdmin amqpAdmin;
-
- @RabbitListener(queues = "email_queue")
- public void sendMail(Message message, Channel channel) throws IOException {
- try {
-
- // 睡眠1秒
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- String s = new String(message.getBody());
- UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class);
-
- int a = 1 / 0; // 造成异常
-
- System.out.println("Mail系统收到 userPOJO=" + userPOJO);
-
- } catch (Exception e) {
- int retryCount = getRetryCount(message);
- System.out.println("Mail系统出现异常 当前retryCount =" + retryCount);
- if (retryCount < MAX_RETRY) {
- // 重试次数未达到最大次数,将消息重新发送到主队列,并增加重试次数
- MessageProperties properties = message.getMessageProperties();
- properties.setHeader("retry_count", retryCount + 1);
- rabbitTemplate.send(topicExchange.getName(), "info.mail", message);
- } else {
-
- // 设置队列属性
- Map<String, Object> arguments = new HashMap<String, Object>();
- // 设置队列的TTL
- arguments.put("x-message-ttl", 10000);
- arguments.put("x-dead-letter-exchange", ERR_EXCHANGE_NAME_DIRECT);// 设置死信队列的交换器名称
- arguments.put("x-dead-letter-routing-key", "err.mail");// 设置死信队列的路由键
-
- // 发送给TTL队列
- amqpAdmin.declareQueue(new Queue("TTL_email_queue", true, false, false, arguments));
-
- amqpAdmin.declareBinding(new Binding("TTL_email_queue", Binding.DestinationType.QUEUE, "死信交换机", "ttl_email", null));
-
- // 发送消息到TTL队列 此队列无消费者 在消息过期后会自动转发到配置的死信队列中去
- rabbitTemplate.send(ERR_EXCHANGE_NAME_DIRECT, "ttl_email", message);
- }
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- return;
- }
-
- // 需要消息确认ACK
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 这里false表示是否开启批量应答
- /*
- 何为批量应答?
- 比如说channel上有传送tag的消息5,6,7,8,当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答 即批量应答
- */
- }
-
- // 死信消费
- @RabbitListener(queues = ERR_QUEUE_NAME)
- public void doFailedInformation(Message message, Channel channel) throws IOException {
-
- // 再次消费信息
- try {
- String s = new String(message.getBody());
- UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class);
- System.out.println("死信消费者消费死信队列: " + userPOJO);
- // 消费消息
-
- } catch (Exception e) {
- e.printStackTrace();
- // 表中字段 success修改为0
- // 发送一封邮件给操作人
- }
-
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
- }
-
- // 获取重试次数
- private int getRetryCount(Message message) {
- Object retryCountObj = message.getMessageProperties().getHeaders().get("retry_count");
- if (retryCountObj instanceof Integer) {
- return (Integer) retryCountObj;
- }
- return 0;
- }
-
- }

两个方法分别监听业务队列和死信队列,如果消息消费出现异常,则重新将消息放入队列尾部,如果重试次数达到三次则将此消息放入TTL队列中,TTL队列中的消息会根据配置的过期时间、死信交换机、以及死信交换机上的routingkey对消息进行投送,进入相应的死信队列,然后再通过死信消费者进行消费处理,此时若再次发送失败,则发送邮件提醒人员进行手工发送来确保消息的有效性。
消费者确保消息的可靠性通过下示代码进行消息的确认
- /*
- (1)channel.basicAck 用于肯定确认,RabbitMQ已经知道该消息并且成功地处理消息,可以将其丢弃了
- (2)channel.basicNack 用于否定确认
- (3)channel.basicReject 用于否定确认,与channel.basicNack相比少一个参数,不处理该消息了直接
- 拒绝,可以将其丢弃了
- */
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
消息发送的流程大致可化为 生产者(业务接口) -> mq -> 交换机 -> 队列 -> 消费者
生产者消息确认机制可以确保在前半部分的有效性,消费者手动确认机制可以确保在后半部分的有效性,而一旦消息连续失败多次,我们还有保底方案通过定时任务扫描DB获取失败的消息转而通过人工发送,这样就可以在全流程上确保消息的可靠性了,这里仅仅是我个人的一套保证可靠性的方案,如果有其他更为可行的方案欢迎评论区补充
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。