当前位置:   article > 正文

sprinboot 集成rqbbitmq延迟推送消息_rqbbitmq exception:invalid amqp data

rqbbitmq exception:invalid amqp data

1.jar包和配置文件之前文章已经写过,这里不再重复写了,地址:https://blog.csdn.net/shy415502155/article/details/92787814

2. 延迟推送队列配置

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.CustomExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import com.shy.springboot.utils.Constant;
  10. /***
  11. * @Title: springstarter
  12. * @author shy
  13. * @Description 延迟推送队列
  14. *
  15. */
  16. @Configuration
  17. public class DelayConfig {
  18. @Bean
  19. public CustomExchange delayTestExchange() {
  20. Map<String, Object> args = new HashMap<>();
  21. args.put(Constant.DELAY_ARGS_KEY, Constant.DELAY_ARGS_VALUE);
  22. return new CustomExchange(Constant.DELAY_TEST_EXCHANGE, Constant.X_DELAYED_MESSAGE,true, false,args);
  23. }
  24. @Bean
  25. public Queue testQueue() {
  26. Queue queue = new Queue(Constant.DELAY_TEST_QUEUE, true);
  27. return queue;
  28. }
  29. @Bean
  30. public Binding testBinding() {
  31. return BindingBuilder.bind(testQueue()).to(delayTestExchange()).with(Constant.DELAY_TEST_QUEUE).noargs();
  32. }
  33. /***
  34. * 延迟队列 delay_update_user_queue 交换器 delay_update_user_exchange
  35. *
  36. * @return
  37. */
  38. @Bean
  39. public CustomExchange delayUpdateUserExchange() {
  40. Map<String, Object> args = new HashMap<>();
  41. args.put(Constant.DELAY_ARGS_KEY, Constant.DELAY_ARGS_VALUE);
  42. return new CustomExchange(Constant.DELAY_UPDATE_USER_EXCHANGE, Constant.X_DELAYED_MESSAGE,true, false,args);
  43. }
  44. @Bean
  45. public Queue updateUserQueue() {
  46. Queue queue = new Queue(Constant.DELAY_UPDATE_USER_QUEUE, true);
  47. return queue;
  48. }
  49. @Bean
  50. public Binding updateUserBinding() {
  51. return BindingBuilder.bind(updateUserQueue()).to(delayUpdateUserExchange()).with(Constant.DELAY_UPDATE_USER_QUEUE).noargs();
  52. }
  53. }

3. 延迟推送队列消费

  1. import java.io.IOException;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.Map;
  6. import javax.annotation.Resource;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Component;
  9. import com.shy.springboot.dao.UserDao;
  10. import com.shy.springboot.entity.User;
  11. import com.shy.springboot.utils.Constant;
  12. import lombok.extern.slf4j.Slf4j;
  13. @Slf4j
  14. @Component
  15. //@RabbitListener(queues = "delayqueue")
  16. public class ConsumeDelay {
  17. @Resource
  18. private UserDao userDao;
  19. //@RabbitHandler
  20. @RabbitListener(queues = Constant.DELAY_UPDATE_USER_QUEUE)
  21. public void process(User user) throws ParseException, IOException{
  22. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  23. user.setUser_id(1);
  24. int update = userDao.update(user);
  25. log.info("更新用户信息,更新时间为 :" + sdf.format(new Date()));
  26. }
  27. @RabbitListener(queues = Constant.DELAY_TEST_QUEUE)
  28. public void processTest(Map<String, Object> map) throws ParseException{
  29. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  30. log.info("消息接受时间》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》:" + sdf.format(new Date()));
  31. log.info("接受的信息》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》:" + map.get("msg"));
  32. }
  33. }

4. 发起请求

  1. @RequestMapping(value = "/testmq")
  2. @ResponseBody
  3. public Map<String, Object> consumeMq() throws ParseException {
  4. User user = new User();
  5. user.setUser_name("MQ_" + CommonUtils.getRandomStr(6, 1));
  6. user.setUser_pass(CommonUtils.getRandomStr(12, 1));
  7. user.setSex(1);
  8. user.setToken(null);
  9. String birthday = "1999-08-01";
  10. Date birthdayDate = CommonUtils.stringToDate(birthday, "yyyy-MM-dd");
  11. user.setBirthday(birthdayDate);
  12. user.setCreate_time(new Date());
  13. user.setUpdate_time(new Date());
  14. user.setIs_del(Constant.IS_DEL_NO);
  15. Map<String, Object> returnMap = new HashMap<String, Object>();
  16. log.info("请求发起时间 》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》 :" + CommonUtils.dateFormat(new Date(), null));
  17. delayrabbitTemplate.convertAndSend(Constant.DELAY_UPDATE_USER_EXCHANGE, Constant.DELAY_UPDATE_USER_QUEUE, user,
  18. new MessagePostProcessor() {
  19. @Override
  20. public Message postProcessMessage(Message message) throws AmqpException {
  21. message.getMessageProperties().setHeader(Constant.DELAY_HEADER_KEY, 60000);
  22. return message;
  23. }
  24. });
  25. String msg = "進入fanout.test隊列,即將被消費... ";
  26. rabbitTemplate.convertAndSend("testFanoutExchange", "fanout.test", msg);
  27. // 發送DELAY_TEST_QUEUE
  28. Map<String, Object> map = new HashMap<String, Object>();
  29. map.put("msg", "即將往 DELAY_TEST_QUEUE 隊列中發送消息,请求发起时间 :" + CommonUtils.dateFormat(new Date(), null));
  30. delayrabbitTemplate.convertAndSend(Constant.DELAY_TEST_EXCHANGE, Constant.DELAY_TEST_QUEUE, map,
  31. new MessagePostProcessor() {
  32. @Override
  33. public Message postProcessMessage(Message message) throws AmqpException {
  34. message.getMessageProperties().setHeader(Constant.DELAY_HEADER_KEY, 60000);
  35. return message;
  36. }
  37. });
  38. return returnMap;
  39. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/808723
推荐阅读
相关标签
  

闽ICP备14008679号