赞
踩
官方教程 getstarted
https://www.rabbitmq.com/getstarted.html
Routing Receiving messages selectively
Topics Receiving messages based on a pattern (topics)
RPC Request/reply pattern
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
官网提示:https://www.erlang-solutions.com/resources/download.html
链接:https://pan.baidu.com/s/1w3zpv8iR1E5bs7m9EogYig
提取码:orqm
yum -y install esl-erlang_23.0.2-1_centos_7_amd64.rpm
erl
[root@seckill rabbitmq]# erl
Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]
Eshell V11.0.2 (abort with ^G)
官网下载地址:http://www.rabbitmq.com/download.html
链接:https://pan.baidu.com/s/1MZUispijy1F0Gh-EXJUrVA
提取码:5qdp
yum -y install rabbitmq-server-3.8.5-1.el7.noarch.rpm
rabbitmq-plugins enable rabbitmq_management
systemctl start rabbitmq-server.service
错误
{:query, :rabbit@seckill, {:badrpc, :timeout}}
方法:需要设置自己的hostname
[root@seckill rabbitmq]# hostnamectl
Static hostname: seckill
Icon name: computer-vm
[root@seckill rabbitmq]# vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.25.130 seckill
systemctl status rabbitmq-server.service
guest/guest 用户默认只可以localhost(本机)访问
localhost:15672
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5
/etc/rabbitmq/rabbitmq.config
在rabbitmq的配置文件目录下(默认为:/etc/rabbitmq)创建一个rabbitmq.config文件。
文件中添加如下配置(请不要忘记那个“.”):
[{rabbit, [{loopback_users, []}]}].
重启rabbitmq服务
重新访问
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
广播 全部queue都收到
依据绑定的路由key,转到到指定queue
依据路由规则,*,#通配符 *表示 1各或者多个 *表示0次或多次
匹配map中的值 match
package com.example.miaosha.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfigTest { private static final String QUEUE01="queue_fanout01"; private static final String QUEUE02="queue_fanout02"; private static final String EXCHANGE="fanoutExchange"; private static final String DIRCTQUEUE01="direct_queue01"; private static final String DIRCTQUEUE02="direct_queue02"; private static final String DIRCTEXCHANGE="direct_Exchange"; private static final String ROUTINGKEY01="queue.red"; private static final String ROUTINGKEY02="queue.green"; private static final String TOPICQUEUE01="topic_queue01"; private static final String TOPICQUEUE02="topic_queue02"; private static final String TOPICEXCHANGE="topic_Exchange"; private static final String TOPICROUTINGKEY01="#.queue.#"; private static final String TOPICROUTINGKEY02="*.queue.*"; private static final String HEADQUEUE01="head_queue01"; private static final String HEADQUEUE02="head_queue02"; private static final String HEADEXCHANGE="head_Exchange"; @Bean //生产者 消费者 public Queue queue(){ return new Queue("q1",true); } @Bean public Queue queue01(){ return new Queue(QUEUE01); } @Bean public Queue queue02(){ return new Queue(QUEUE02); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE); } @Bean public Binding Binding01(){ return BindingBuilder.bind(queue01()).to(fanoutExchange()); } @Bean public Binding Binding02(){ return BindingBuilder.bind(queue02()).to(fanoutExchange()); } @Bean public Queue directQueue01(){ return new Queue(DIRCTQUEUE01); } @Bean public Queue directQueue02(){ return new Queue(DIRCTQUEUE02); } @Bean public DirectExchange directExchange(){ return new DirectExchange(DIRCTEXCHANGE); } @Bean public Binding directBinding01(){ return BindingBuilder.bind(directQueue01()).to(directExchange()).with(ROUTINGKEY01); } @Bean public Binding directBinding02(){ return BindingBuilder.bind(directQueue02()).to(directExchange()).with(ROUTINGKEY02); } @Bean public Queue topicQueue01(){ return new Queue(TOPICQUEUE01); } @Bean public Queue topicQueue02(){ return new Queue(TOPICQUEUE02); } @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPICEXCHANGE); } @Bean public Binding topicBinding01(){ return BindingBuilder.bind(topicQueue01()).to(topicExchange()).with(TOPICROUTINGKEY01); } @Bean public Binding topicBinding02(){ return BindingBuilder.bind(topicQueue02()).to(topicExchange()).with(TOPICROUTINGKEY02); } @Bean public Queue headQueue01(){ return new Queue(HEADQUEUE01); } @Bean public Queue headQueue02(){ return new Queue(HEADQUEUE02); } @Bean public HeadersExchange headExchange(){ return new HeadersExchange(HEADEXCHANGE); } @Bean public Binding headBinding01(){ Map<String,Object> map=new HashMap<>(); map.put("color","red"); map.put("speed","low"); return BindingBuilder.bind(headQueue01()).to(headExchange()).whereAll(map).match(); } @Bean public Binding headBinding02(){ Map<String,Object> map=new HashMap<>(); map.put("color","red"); map.put("speed","fast"); return BindingBuilder.bind(headQueue02()).to(headExchange()).whereAll(map).match(); } }
实际秒杀用到的TopicExchange配置
package com.example.miaosha.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { private static final String QUEUE="seckillQueue"; private static final String EXCHANGE="seckillExchange"; private static final String ROUTINGKEY="seckill.#"; @Bean public Queue seckillQueue(){ return new Queue(QUEUE); } @Bean public TopicExchange seckillExchange(){ return new TopicExchange(EXCHANGE); } @Bean public Binding binding(){ return BindingBuilder.bind(seckillQueue()).to(seckillExchange()).with(ROUTINGKEY); } }
通过 @RabbitListener(queues = “seckillQueue”) 绑定queue
通过fastjson 把 String message 转为SeckillMessage对象使用
package com.example.miaosha.rabbitmq; import com.alibaba.fastjson.JSON; import com.example.miaosha.pojo.SeckillMessage; import com.example.miaosha.pojo.SeckillOrder; import com.example.miaosha.pojo.User; import com.example.miaosha.service.IGoodsService; import com.example.miaosha.service.IOrderService; import com.example.miaosha.utils.JsonUtil; import com.example.miaosha.vo.GoodsVo; import com.example.miaosha.vo.RespBean; import com.example.miaosha.vo.RespBeanEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; @Service @Slf4j public class MQReceiver { @Autowired private IGoodsService goodsService; @Autowired private RedisTemplate redisTemplate; @Autowired private IOrderService orderService; @RabbitListener(queues = "seckillQueue") public void receive(String message){ log.info("seckillQueue接受消息:"+message); SeckillMessage seckillMessage = JSON.parseObject(message, SeckillMessage.class); Long goodsId = seckillMessage.getGoodsId(); User user = seckillMessage.getUser(); //下单操作 GoodsVo goodsVo = goodsService.findGoodsVoById(goodsId); if(goodsVo.getStockCount()<1){ //如果有key,表示,没有库存,结束 redisTemplate.opsForValue().set("isStockEmpty:"+goodsId,"0"); return; } //1. 判断是否重复抢购 ValueOperations valueOperations = redisTemplate.opsForValue(); SeckillOrder seckillOrder = (SeckillOrder)valueOperations.get("order:" + user.getId() + ":" + goodsId); if(null!=seckillOrder){ return; } //重复抢购 orderService.seckill(goodsVo,user); } /* @RabbitListener(queues = "q1") public void receive(Object msg){ log.info("接受消息:"+msg); } @RabbitListener(queues = "queue_fanout01") public void receive1(Object msg){ log.info("queue_fanout01接受消息:"+msg); } @RabbitListener(queues = "queue_fanout02") public void receive2(Object msg){ log.info("queue_fanout02接受消息:"+msg); } @RabbitListener(queues = "direct_queue01") public void receive3(Object msg){ log.info("direct_queue01接受消息:"+msg); } @RabbitListener(queues = "direct_queue02") public void receive4(Object msg){ log.info("direct_queue02接受消息:"+msg); } @RabbitListener(queues = "topic_queue01") public void receive5(Message msg){ log.info("topic_queue01接受消息:"+msg); } @RabbitListener(queues = "topic_queue02") public void receive6(Message msg){ log.info("topic_queue02接受消息:"+msg); } @RabbitListener(queues = "head_queue01") public void receive7(Message msg){ log.info("head_queue01接受消息:"+new String(msg.getBody())); } @RabbitListener(queues = "head_queue02") public void receive8(Message msg){ log.info("head_queue02接受消息:"+new String(msg.getBody())); } */ }
注入 rabbitTemplate
exchangename:seckillExchange
routerkey:seckill.message
message string
rabbitTemplate.convertAndSend(“seckillExchange”,“seckill.message”,message);
package com.example.miaosha.rabbitmq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendSeckillMessage(String message){ log.info("sendSeckill发送消息:"+message); rabbitTemplate.convertAndSend("seckillExchange","seckill.message",message); } /* public void send(Object msg){ log.info("发送消息:"+msg); rabbitTemplate.convertAndSend("q1",msg); } public void sendFanout(Object msg){ log.info("fanout发送消息:"+msg); rabbitTemplate.convertAndSend("fanoutExchange","",msg); } public void sendDirect01(Object msg){ log.info("directExchange发送消息:"+msg); rabbitTemplate.convertAndSend("direct_Exchange","queue.red",msg); } public void sendDirect02(Object msg){ log.info("directExchange发送消息:"+msg); rabbitTemplate.convertAndSend("direct_Exchange","queue.green",msg); } public void sendTopic01(Object msg){ log.info("topic_Exchange发送消息:"+msg); rabbitTemplate.convertAndSend("topic_Exchange","aaa.queue.bbb",msg); } public void sendTopic02(Object msg){ log.info("topic_Exchange发送消息:"+msg); rabbitTemplate.convertAndSend("topic_Exchange","queue.red.message",msg); } public void sendHead01(String msg){ log.info("topic_Exchange发送消息:"+msg); MessageProperties properties=new MessageProperties(); properties.setHeader("color","red"); properties.setHeader("speed","low"); Message message=new Message(msg.getBytes(),properties); rabbitTemplate.convertAndSend("head_Exchange","",message); } public void sendHead02(String msg){ log.info("topic_Exchange发送消息:"+msg); MessageProperties properties=new MessageProperties(); properties.setHeader("color","red"); properties.setHeader("speed","fast"); Message message=new Message(msg.getBytes(),properties); rabbitTemplate.convertAndSend("head_Exchange","",message); } */ }
@Controller @Slf4j @RequestMapping("/seckill") public class SeckillController implements InitializingBean { @Autowired private MQSender mqSender; //秒杀静态化,在商品详情页,直接ajax请求秒杀,成功后,跳转秒杀成功静态页面 @RequestMapping(value = "/{path}/doSeckill",method = RequestMethod.POST) @ResponseBody public RespBean doSeckill(Model model, User user,Long goodsId,@PathVariable String path) { //下单 SeckillMessage seckillMessage = new SeckillMessage(user, goodsId); mqSender.sendSeckillMessage(JSON.toJSONString(seckillMessage)); //正在排队中 0 return RespBean.success(0); } }
消息对象
package com.example.miaosha.pojo;
import com.example.miaosha.vo.GoodsVo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SeckillMessage {
private User user;
private Long goodsId;
}
package com.example.miaosha.controller; import com.example.miaosha.pojo.User; import com.example.miaosha.rabbitmq.MQSender; import com.example.miaosha.vo.RespBean; import com.example.miaosha.vo.RespBeanEnum; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.ResponseBody; /** * <p> * 前端控制器 * </p> * * @author cch * @since 2021-11-14 */ @Controller @RequestMapping("/user") public class UserController { @Autowired private MQSender mqSender; @RequestMapping("/info") @ResponseBody public RespBean info(User user){ return RespBean.success(user); } @RequestMapping("/mq") @ResponseBody public void mq(){ mqSender.send("hello"); } @RequestMapping("/mq/fanout") @ResponseBody public void fanout(){ mqSender.sendFanout("hello"); } @RequestMapping("/mq/direct01") @ResponseBody public void direct01(){ mqSender.sendDirect01("hello"); } @RequestMapping("/mq/direct02") @ResponseBody public void direct02(){ mqSender.sendDirect02("hello"); } @RequestMapping("/mq/topic01") @ResponseBody public void topic01(){ mqSender.sendTopic01("hello"); } @RequestMapping("/mq/topic02") @ResponseBody public void topic02(){ mqSender.sendTopic02("hello"); } @RequestMapping("/mq/header01") @ResponseBody public void head01(){ mqSender.sendHead01("hello red low"); } @RequestMapping("/mq/header02") @ResponseBody public void head02(){ mqSender.sendHead02("hello red fast"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。