当前位置:   article > 正文

RabbitMQ实现简单队列_rabbitmq创建队列命令

rabbitmq创建队列命令

一、直连交换机

1、配置信息

  1. spring:
  2. rabbitmq:
  3. host: 107.106.59.96
  4. port: 5672
  5. username: admin
  6. password: rabbit
  7. #消费者 手动确认配置项
  8. listener:
  9. type: simple
  10. # direct:
  11. # auto-startup: false 关闭自动配置
  12. simple:
  13. # auto-startup: false 关闭自动配置
  14. acknowledge-mode: MANUAL #消息确认方式 MANUAL手动确认 NONE不确认 AUTO自动确认
  15. retry:
  16. enabled: true #开启重试
  17. max-attempts: 5 #最大重试次数
  18. initial-interval: 500ms #重试间隔时间
  19. #生产者 消息确认配置项
  20. #确认消息已发送到交换机(Exchange)
  21. publisher-confirm-type: correlated
  22. #确认消息已发送到队列(Queue)
  23. publisher-returns: true
  24. template:
  25. mandatory: true

2、创建队列、交换机并绑定

  1. @Configuration
  2. public class DirectRabbitConfig {
  3. //队列 起名:buyOrderPhotoQueue
  4. @Bean
  5. public Queue buyOrderPhotoQueue() {
  6. return new Queue("buyOrderPhotoQueue",true);
  7. }
  8. //Direct交换机 起名:buyOrderPhotoExchange
  9. @Bean
  10. public DirectExchange buyOrderPhotoExchange() {
  11. return new DirectExchange("buyOrderPhotoExchange",true,false);
  12. }
  13. //绑定 将队列和交换机绑定, 并设置用于匹配键:buyOrderPhotoRouting
  14. @Bean
  15. public Binding bindingDirect(Queue buyOrderPhotoQueue, DirectExchange buyOrderPhotoExchange) {
  16. return BindingBuilder.bind(buyOrderPhotoQueue).to(buyOrderPhotoExchange).with("buyOrderPhotoRouting");
  17. }
  18. }

2、消费者

  1. @Component
  2. public class DirectReceiver {
  3. public static Logger log = org.apache.logging.log4j.LogManager.getLogger(DirectReceiver.class);
  4. @Value("${maxqps:10}")
  5. private Integer maxqps;
  6. @RabbitHandler
  7. @RabbitListener(queues = "buyOrderPhotoQueue")//监听的队列名称 buyOrderPhotoQueue
  8. public void process(String msg , Message message, Channel channel) {
  9. log.info("DirectReceiver消费者收到消息 : " + msg);
  10. Jedis jedis = null;
  11. String result = "";
  12. try {
  13. jedis = RedisUtil.getJedis();
  14. //判断是否超过最大并发
  15. boolean isPass = ablePass(jedis, maxqps);
  16. if(!isPass){//不放行
  17. //将消息放回队列
  18. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  19. return;
  20. }
  21. JSONObject obj = new JSONObject(msg);
  22. String url = obj.optString("url");
  23. //业务代码实现
  24. ...
  25. } catch (Exception e){
  26. log.error(e);
  27. } finally {
  28. if(jedis != null){
  29. jedis.close();
  30. }
  31. }
  32. }
  33. /**
  34. * 是否可以继续请求
  35. * @param maxCount 最大请求数
  36. * @return 如果超过最大请求,则返回false,否则返回true
  37. */
  38. private boolean ablePass(Jedis jedis, int maxCount){
  39. log.info("ablePass");
  40. Long currentTime = System.currentTimeMillis();
  41. if (jedis.exists("buyOrderPhotoLimit")) {
  42. // intervalTime是限流的时间段
  43. Long intervalTime = 1000L;
  44. Long count = jedis.zcount("buyOrderPhotoLimit", currentTime - intervalTime, currentTime);
  45. log.info("count:" + count);
  46. if (count != null && count >= maxCount) {
  47. log.info("表格识别超过最大并发数");
  48. return false;
  49. }
  50. }
  51. jedis.zadd("buyOrderPhotoLimit", currentTime, UUID.randomUUID().toString());
  52. return true;
  53. }
  54. }

3、生产者

  1. public static String sendMsg(){
  2. JSONObject json = new JSONObject();
  3. json.put("id", 1);
  4. json.put("url", "xxx");
  5. json.put("yguid", "xxx");
  6. json.put("yeyid", 1);
  7. json.put("empid", 1);
  8. RabbitTemplate rabbitTemplate = SpringContextUtil.getBean(RabbitTemplate.class);
  9. CorrelationData correlationData = new CorrelationData();
  10. rabbitTemplate.convertAndSend("buyOrderPhotoExchange","buyOrderPhotoRouting", json.toString(), correlationData);
  11. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/734103
推荐阅读
相关标签
  

闽ICP备14008679号