赞
踩
# rabbitmq spring.rabbitmq.host= spring.rabbitmq.port= #spring.rabbitmq.virtual-host= spring.rabbitmq.username= spring.rabbitmq.password= # 开启confirms回调 P -> Exchange spring.rabbitmq.publisher-confirms=true # 开启returnedMessage回调 Exchange -> Queue spring.rabbitmq.publisher-returns=true # 设置手动确认(ack) Queue -> C spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100 # mail spring.mail.host=smtp.163.com spring.mail.username= spring.mail.password= spring.mail.from= spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true
@Component @Slf4j public class MailUtil { @Value("${spring.mail.from}") private String from; @Autowired private JavaMailSender mailSender; /** * 发送简单邮件 * * @param mail */ public boolean send(Mail mail) { String to = mail.getTo();// 目标邮箱 String title = mail.getTitle();// 邮件标题 String content = mail.getContent();// 邮件正文 SimpleMailMessage message = new SimpleMailMessage(); message.setFrom(from); message.setTo(to); message.setSubject(title); message.setText(content); try { mailSender.send(message); log.info("邮件发送成功"); return true; } catch (MailException e) { log.error("邮件发送失败, to: {}, title: {}", to, title, e); return false; } } /** * 发送附件邮件 * * @param mail 邮件 * @param file 附件 */ public boolean sendAttachment(Mail mail, File file) { String to = mail.getTo(); String title = mail.getTitle(); String content = mail.getContent(); MimeMessage message = mailSender.createMimeMessage(); try { MimeMessageHelper helper = new MimeMessageHelper(message, true); helper.setFrom(from); helper.setTo(to); helper.setSubject(title); helper.setText(content); FileSystemResource resource = new FileSystemResource(file); String fileName = file.getName(); helper.addAttachment(fileName, resource); mailSender.send(message); log.info("附件邮件发送成功"); return true; } catch (Exception e) { log.error("附件邮件发送失败, to: {}, title: {}", to, title, e); return false; } } }
@Slf4j public class JsonUtil { private static ObjectMapper objectMapper = new ObjectMapper(); private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; static { // 对象的所有字段全部列入 objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS); // 取消默认转换timestamps形式 objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); // 忽略空bean转json的错误 objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); // 统一日期格式 objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT)); // 忽略在json字符串中存在, 但在java对象中不存在对应属性的情况, 防止错误 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } public static <T> String objToStr(T obj) { if (null == obj) { return null; } try { return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj); } catch (Exception e) { log.warn("objToStr error: ", e); return null; } } public static <T> T strToObj(String str, Class<T> clazz) { if (StringUtils.isBlank(str) || null == clazz) { return null; } try { return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz); } catch (Exception e) { log.warn("strToObj error: ", e); return null; } } public static <T> T strToObj(String str, TypeReference<T> typeReference) { if (StringUtils.isBlank(str) || null == typeReference) { return null; } try { return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference)); } catch (Exception e) { log.error("strToObj error", e); return null; } } }
public class MessageHelper { public static Message objToMsg(Object obj){ if (null == obj){ return null; } Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build(); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化 message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); return message; } public static <T> T msgToObj(Message message,Class<T> clazz){ if (null == message || null == clazz){ return null; } String str = new String(message.getBody()); T obj = JsonUtil.strToObj(str, clazz); return obj; } }
@Configuration @Slf4j public class RabbitConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MsgLogService msgLogService; @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter()); //消息是否成功发送到 Exchange CorrelationData rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{ if (ack){ log.info("消息成功发送到Exchange"); String msgId = correlationData.getId(); msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS); } else { log.info("消息发送到Exchange失败, {}, cause: {}",correlationData,cause); } }); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{ log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter converter(){ return new Jackson2JsonMessageConverter(); } public static final String MAIL_QUEUE_NAME = "mail.queue"; public static final String MAIL_EXCHANGE_NAME = "mail.exchange"; public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key"; //创建queue @Bean(name = "mailQueue") public Queue mailQueue(){ return new Queue(MAIL_QUEUE_NAME,true); } @Bean(name = "mailExchange") public DirectExchange mailExchange(){ return new DirectExchange(MAIL_EXCHANGE_NAME,true,false); } //绑定队列到交换机声明使用的routingkey /* @Bean public Binding mailBinding(){ Binding with = BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); }*/ //绑定队列道交换机 @Bean public Binding queueExchange(@Qualifier("mailQueue")Queue queue, @Qualifier("mailExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(MAIL_ROUTING_KEY_NAME).noargs(); } }
public ServerResponse send(Mail mail) {
String msgId = RandomUtil.UUID32();
mail.setMsgId(msgId);
MsgLog msgLog = new MsgLog(msgId,mail,RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME);
//消息入库
msgLogMapper.insert(msgLog);
//mq队列
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail),correlationData);
return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
}
@Component @Slf4j public class SimpleMailConsumer { @Autowired private MsgLogService msgLogService; @Autowired private MailUtil mailUtil; @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME) public void consumer(Message message, Channel channel) throws IOException { Mail mail = MessageHelper.msgToObj(message, Mail.class); log.info("收到消息: {}",mail.toString()); String msgId = mail.getMsgId(); MsgLog msgLog = msgLogService.selectByMsgId(msgId); if (null ==msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)){ log.info("重复消费,msgId: {}",msgId); return; } MessageProperties properties = message.getMessageProperties(); long tag = properties.getDeliveryTag(); boolean success = mailUtil.send(mail); if (success){ msgLogService.updateStatus(msgId,Constant.MsgLogStatus.CONSUMED_SUCCESS); channel.basicAck(tag,false);//消费确认 }else { //未被ack的消息(unacked)会重新入队并被消费, 这样就保证了消息不会走丢。 channel.basicNack(tag,false,true); } } }
原作者地址
注:抄别人的,自己只做个记录
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。