赞
踩
1.jar包和配置文件之前文章已经写过,这里不再重复写了,地址:https://blog.csdn.net/shy415502155/article/details/92787814
2. 延迟推送队列配置
- import java.util.HashMap;
- import java.util.Map;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.CustomExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import com.shy.springboot.utils.Constant;
-
- /***
- * @Title: springstarter
- * @author shy
- * @Description 延迟推送队列
- *
- */
- @Configuration
- public class DelayConfig {
-
- @Bean
- public CustomExchange delayTestExchange() {
- Map<String, Object> args = new HashMap<>();
- args.put(Constant.DELAY_ARGS_KEY, Constant.DELAY_ARGS_VALUE);
- return new CustomExchange(Constant.DELAY_TEST_EXCHANGE, Constant.X_DELAYED_MESSAGE,true, false,args);
- }
-
- @Bean
- public Queue testQueue() {
- Queue queue = new Queue(Constant.DELAY_TEST_QUEUE, true);
- return queue;
- }
-
- @Bean
- public Binding testBinding() {
- return BindingBuilder.bind(testQueue()).to(delayTestExchange()).with(Constant.DELAY_TEST_QUEUE).noargs();
- }
-
- /***
- * 延迟队列 delay_update_user_queue 交换器 delay_update_user_exchange
- *
- * @return
- */
-
- @Bean
- public CustomExchange delayUpdateUserExchange() {
- Map<String, Object> args = new HashMap<>();
- args.put(Constant.DELAY_ARGS_KEY, Constant.DELAY_ARGS_VALUE);
- return new CustomExchange(Constant.DELAY_UPDATE_USER_EXCHANGE, Constant.X_DELAYED_MESSAGE,true, false,args);
- }
-
- @Bean
- public Queue updateUserQueue() {
- Queue queue = new Queue(Constant.DELAY_UPDATE_USER_QUEUE, true);
- return queue;
- }
-
- @Bean
- public Binding updateUserBinding() {
- return BindingBuilder.bind(updateUserQueue()).to(delayUpdateUserExchange()).with(Constant.DELAY_UPDATE_USER_QUEUE).noargs();
- }
- }

3. 延迟推送队列消费
- import java.io.IOException;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Map;
-
- import javax.annotation.Resource;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import com.shy.springboot.dao.UserDao;
- import com.shy.springboot.entity.User;
- import com.shy.springboot.utils.Constant;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- @Component
- //@RabbitListener(queues = "delayqueue")
- public class ConsumeDelay {
-
- @Resource
- private UserDao userDao;
-
- //@RabbitHandler
- @RabbitListener(queues = Constant.DELAY_UPDATE_USER_QUEUE)
- public void process(User user) throws ParseException, IOException{
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- user.setUser_id(1);
- int update = userDao.update(user);
- log.info("更新用户信息,更新时间为 :" + sdf.format(new Date()));
- }
-
- @RabbitListener(queues = Constant.DELAY_TEST_QUEUE)
- public void processTest(Map<String, Object> map) throws ParseException{
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- log.info("消息接受时间》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》:" + sdf.format(new Date()));
- log.info("接受的信息》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》:" + map.get("msg"));
- }
- }

4. 发起请求
- @RequestMapping(value = "/testmq")
- @ResponseBody
- public Map<String, Object> consumeMq() throws ParseException {
- User user = new User();
- user.setUser_name("MQ_" + CommonUtils.getRandomStr(6, 1));
- user.setUser_pass(CommonUtils.getRandomStr(12, 1));
- user.setSex(1);
- user.setToken(null);
- String birthday = "1999-08-01";
- Date birthdayDate = CommonUtils.stringToDate(birthday, "yyyy-MM-dd");
- user.setBirthday(birthdayDate);
- user.setCreate_time(new Date());
- user.setUpdate_time(new Date());
- user.setIs_del(Constant.IS_DEL_NO);
- Map<String, Object> returnMap = new HashMap<String, Object>();
- log.info("请求发起时间 》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》 :" + CommonUtils.dateFormat(new Date(), null));
- delayrabbitTemplate.convertAndSend(Constant.DELAY_UPDATE_USER_EXCHANGE, Constant.DELAY_UPDATE_USER_QUEUE, user,
- new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setHeader(Constant.DELAY_HEADER_KEY, 60000);
- return message;
- }
- });
- String msg = "進入fanout.test隊列,即將被消費... ";
- rabbitTemplate.convertAndSend("testFanoutExchange", "fanout.test", msg);
- // 發送DELAY_TEST_QUEUE
- Map<String, Object> map = new HashMap<String, Object>();
- map.put("msg", "即將往 DELAY_TEST_QUEUE 隊列中發送消息,请求发起时间 :" + CommonUtils.dateFormat(new Date(), null));
- delayrabbitTemplate.convertAndSend(Constant.DELAY_TEST_EXCHANGE, Constant.DELAY_TEST_QUEUE, map,
- new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setHeader(Constant.DELAY_HEADER_KEY, 60000);
- return message;
- }
- });
- return returnMap;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。