赞
踩
一、直连交换机
1、配置信息
- spring:
- rabbitmq:
- host: 107.106.59.96
- port: 5672
- username: admin
- password: rabbit
- #消费者 手动确认配置项
- listener:
- type: simple
- # direct:
- # auto-startup: false 关闭自动配置
- simple:
- # auto-startup: false 关闭自动配置
- acknowledge-mode: MANUAL #消息确认方式 MANUAL手动确认 NONE不确认 AUTO自动确认
- retry:
- enabled: true #开启重试
- max-attempts: 5 #最大重试次数
- initial-interval: 500ms #重试间隔时间
- #生产者 消息确认配置项
- #确认消息已发送到交换机(Exchange)
- publisher-confirm-type: correlated
- #确认消息已发送到队列(Queue)
- publisher-returns: true
- template:
- mandatory: true
2、创建队列、交换机并绑定
- @Configuration
- public class DirectRabbitConfig {
-
- //队列 起名:buyOrderPhotoQueue
- @Bean
- public Queue buyOrderPhotoQueue() {
- return new Queue("buyOrderPhotoQueue",true);
- }
-
- //Direct交换机 起名:buyOrderPhotoExchange
- @Bean
- public DirectExchange buyOrderPhotoExchange() {
- return new DirectExchange("buyOrderPhotoExchange",true,false);
- }
-
- //绑定 将队列和交换机绑定, 并设置用于匹配键:buyOrderPhotoRouting
- @Bean
- public Binding bindingDirect(Queue buyOrderPhotoQueue, DirectExchange buyOrderPhotoExchange) {
- return BindingBuilder.bind(buyOrderPhotoQueue).to(buyOrderPhotoExchange).with("buyOrderPhotoRouting");
- }
-
- }
2、消费者
- @Component
- public class DirectReceiver {
- public static Logger log = org.apache.logging.log4j.LogManager.getLogger(DirectReceiver.class);
-
- @Value("${maxqps:10}")
- private Integer maxqps;
-
- @RabbitHandler
- @RabbitListener(queues = "buyOrderPhotoQueue")//监听的队列名称 buyOrderPhotoQueue
- public void process(String msg , Message message, Channel channel) {
- log.info("DirectReceiver消费者收到消息 : " + msg);
- Jedis jedis = null;
- String result = "";
- try {
- jedis = RedisUtil.getJedis();
- //判断是否超过最大并发
- boolean isPass = ablePass(jedis, maxqps);
- if(!isPass){//不放行
- //将消息放回队列
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
- return;
- }
- JSONObject obj = new JSONObject(msg);
- String url = obj.optString("url");
- //业务代码实现
- ...
- } catch (Exception e){
- log.error(e);
- } finally {
- if(jedis != null){
- jedis.close();
- }
- }
- }
-
- /**
- * 是否可以继续请求
- * @param maxCount 最大请求数
- * @return 如果超过最大请求,则返回false,否则返回true
- */
- private boolean ablePass(Jedis jedis, int maxCount){
- log.info("ablePass");
- Long currentTime = System.currentTimeMillis();
- if (jedis.exists("buyOrderPhotoLimit")) {
- // intervalTime是限流的时间段
- Long intervalTime = 1000L;
- Long count = jedis.zcount("buyOrderPhotoLimit", currentTime - intervalTime, currentTime);
- log.info("count:" + count);
- if (count != null && count >= maxCount) {
- log.info("表格识别超过最大并发数");
- return false;
- }
- }
- jedis.zadd("buyOrderPhotoLimit", currentTime, UUID.randomUUID().toString());
- return true;
- }
-
- }
3、生产者
- public static String sendMsg(){
- JSONObject json = new JSONObject();
- json.put("id", 1);
- json.put("url", "xxx");
- json.put("yguid", "xxx");
- json.put("yeyid", 1);
- json.put("empid", 1);
- RabbitTemplate rabbitTemplate = SpringContextUtil.getBean(RabbitTemplate.class);
- CorrelationData correlationData = new CorrelationData();
- rabbitTemplate.convertAndSend("buyOrderPhotoExchange","buyOrderPhotoRouting", json.toString(), correlationData);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。