赞
踩
以提供者、消费者为例、
准备 :
创建Springboot项目 消费、提供模块、配置yml文件、导入依赖
Yaml文件
- server:
- port: 1234
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
Pom依赖
<dependencies> <!-- rabbitmq 的依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
提供者:
Yaml文件
- server:
- port: 12345
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
提供者 建立一个消息的发送接口:
- public interface IMessageProducerService {
-
- public void sendMessage1(String msg);
-
- public void sendMessage2(Map<String,Object> map);
-
- public void sendMessage3(String msg);
-
- public void sendDelayedMessage(String msg);
- }
再建立一个接口的实现类
- @Component
- @Service
- public class MessageProducerServiceImpl implements IMessageProducerService{
- @Resource
- private RabbitTemplate template;
-
- private void sendMessage(String exchange,String routingKey,Object message){
- //exchange 交换机的名字、routingkey 路由键、message 发送的消息
- //提供者发送 消息到交换机,不是消息队列、那么交换机、路由键 必须存在、否则报错
- //交换机和队列每次修改**持久化**和**自动删除**等属性都要**先删除**原来交换机和队列,
- //否则启动就异常,报创建**队列**失败
- this.template.convertAndSend(exchange,routingKey,message);
- }
-
- @Override
- public void sendMessage1(String msg) {
- this.sendMessage("dx","rk",msg);
- }
-
- @Override
- public void sendMessage2(Map<String, Object> map) {
- this.sendMessage(
- "myfan","",map);
- }
-
- @Override
- public void sendMessage3(String msg) {
- this.sendMessage("mytopic","a.b.log.dao",msg);
- this.sendMessage("mytopic","log.dao",msg);
- this.sendMessage("mytopic","log.d.c",msg);
- this.sendMessage("mytopic","a.log.c",msg);
- }
- }
消费者通过代码创建 交换机、消息队列、路由键、设置消息队列过期时间
DirectExchange 直连模式、FanoutExchange 广播模式、TopicExchange、
- @Configuration
- //延时队列只能通过代码创建绑定或Web端手动创建绑定
- public class ConsumerConfiger {
-
- public static final String EXCHANGE = "dx"; // 交换空间名称
- public static final String ROUTINGKEY = "rk"; // 设置路由key
-
-
- //1、以直连的方式创建
- @Primary
- @Bean
- public DirectExchange directExchange() {
- // 要创建的交换机
- // 持久化(存硬盘,重启还在,默认true)
- // 自动删除(当所有绑定队列都不在使用时删除交换机)
- return new DirectExchange(EXCHANGE, true, false);
- }
- @Bean
- public Queue queue11() {
- // 要创建的队列
- // 持久化(保存硬盘,重启存在,默认true)
- // 排他(只有创建它的连接可用,连接关闭时无论队列中有没有消息会删除队列)
- // 自动删除(当队列中有消息时,无论是否排他,关闭连接都不会删除队列,此时消费者消费完消息后再断开消费者,队列会被自动删除,删除后队列消息也丢失)
- // args参数设置队列最多5条消息(保留最后的5条消息,前面的全部丢失)
- return new Queue("q11",true,true,false,args);
- }
-
- @Bean
- public Queue queue12() {
- // args参数设置队列过期时间10秒,10秒后所有消息清空
- Map<String,Object> args = new HashMap<>();
- args.put("x-message-ttl",10000);
- return new Queue("q12", true,false,false,args);
- }
-
- //1、看binding 就知道是绑定的意思、绑定 exchange 交换机 下面的 queue 消息队列 下面的
- // routingkey
- @Bean
- public Binding bindingExchangeQueue11(DirectExchange exchange, Queue queue11) {
- return BindingBuilder.bind(queue1).to(exchange).with(ROUTINGKEY);
- }
- @Bean
- public Binding bindingExchangeQueue12(DirectExchange exchange, Queue queue12) {
- return BindingBuilder.bind(queue2).to(exchange).with(ROUTINGKEY);
- }
-
-
- //2、以广播模式创建、
- /**
- * FanoutExchange 广播模式
- * @return
- */
- @Bean
- public FanoutExchange getFanoutExchange(){
- return new FanoutExchange("myfan");
- }
- @Bean
- public Queue queue21() {
- return new Queue("q21");
- }
- @Bean
- public Queue queue22() {
- return new Queue("q22");
- }
- @Bean
- public Binding bindingExchangeQueue21(FanoutExchange exchange, Queue queue21) {
- return BindingBuilder.bind(queue21).to(exchange);
- }
- @Bean
- public Binding bindingExchangeQueue22(FanoutExchange exchange, Queue queue22) {
- return BindingBuilder.bind(queue22).to(exchange);
- }
-
-
- //3、TopicExchange
- // "*"用于匹配一个单词,比如"a","abc"等;
- // "#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等
- @Bean
- public TopicExchange getTopicExchange(){
- return new TopicExchange("topic");
- }
- @Bean
- public Queue queue31() {
- return new Queue("q31");
- }
- @Bean
- public Queue queue32() {
- return new Queue("q32");
- }
- @Bean
- public Binding bindingExchangeQueue31(TopicExchange exchange, Queue queue31) {
- return BindingBuilder.bind(queue31).to(exchange).with("#.log.*");
- }
- @Bean
- public Binding bindingExchangeQueue32(TopicExchange exchange, Queue queue32) {
- return BindingBuilder.bind(queue32).to(exchange).with("*.log.#");
- }
-
- }
消费者实现监听处理类:、负责接收提供者发送的消息、
- @Service
- public class MessageConsumer {
-
- @RabbitListener(queues = "q11")
- public void receiveMessage1(String msg){
- System.out.println("【*** 接收消息11 ***】:"+msg);
- }
-
- //注解方式绑定交换机和队列 无需创建配置类、
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "q11"),
- exchange = @Exchange(value = "dx",type = ExchangeTypes.DIRECT)
- ))
- //public void receiveMessage(String msg) { // 进行消息接收处理
- //System.out.println("【*** q1接收消息 ***】" + msg);
- //}
-
-
- @RabbitListener(queues = "q12")
- public void receiveMessage2(String msg){
- System.out.println("【*** 接收消息12 ***】:"+msg);
- }
-
- @RabbitListener(queues = {"q21","q22"})
- public void receiveMessage212(Map<String, Object> map){
- System.out.println("q21-22:"+map);
- }
- }
最后一步、提供者创建Test测试类、或者是像我一样创建控制层、调用方法使用
- @RestController
- public class MessageController {
-
- @Resource
- private IMessageProducerService service;
-
- @Resource
- private RabbitTemplate template;
-
-
-
- @RequestMapping("/test1")
- public void testMessage1() throws InterruptedException {
- service.sendMessage1("我发的第一条消息!"); 异常问题待会说
- /*
- for (int i=0;i<100;i++){ //有5条多余+10条超时的进入死信队列
- Thread.sleep(100);
- this.service.sendMessage1("直连消息:"+i);
- }
- */
- System.out.println("消息发送完毕");
- }
-
- @RequestMapping("/test2")
- public void testMessage2() {
- Map<String,Object> map = new HashMap<>();
- map.put("sid",1);
- map.put("nums",2);
- this.service.sendMessage2(map);
- System.out.println("消息发送完毕");
- }
-
- @RequestMapping("/test3")
- public void testMessage3() {
- this.service.sendMessage3("我发的topic交换机消息!");
- System.out.println("消息发送完" +
- "毕");
- }
-
-
- @RequestMapping("/test4")
- public void testMessage4() {
- this.service.sendDelayedMessage("我发的延时交换机消息!");
- System.out.println("消息发送完" +
- "毕");
- }
- }
以上是最基本的RabbitMq的使用 、如果有异常会出现 消息队列没有接收到消息、导致方法无线循环、我们可以配置 出现异常、只出现指定次数、
消费者配置yml 文件:
- spring:
- rabbitmq:
- #publisher-confirms: true # 发送确认 版本似乎不再支持
- #publisher-returns: true # 路由失败回调
- template:
- # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
- mandatory: true
- listener:
- simple:
- retry:
- enabled: true
- max-attempts: 3 # 重试3次
- prefetch: 1 # 每次从RabbitMQ获取的消息数量
- default-requeue-rejected: false
- concurrency: 1 # 每个队列启动的消费者数量
- max-concurrency: 5 # 每个队列最大的消费者数量
- acknowledge-mode: manual # 签收模式为手动签收,需要在代码中手动ACK
在消费者的监听处理类 随便找个方法 加上 int i = 10 / 0; 就进入异常、最后发现最多执行三次异常、
接下来: RabbitMq延时队列的使用
消费者配置类
- @Configuration
- public class ConsumerConfig {
- /**
- *
- * 延迟交换机(x-delayed-type、x-delayed-message固定写法)
- */
- @Bean("delayExchange")
- public CustomExchange delayExchange(){
- Map<String,Object> args = new HashMap<>();
- //x-delayed-type 固定写法
- args.put("x-delayed-type","direct");
- return new CustomExchange("delay.exchange","x-delayed-message",true,true,args);
- }
-
- @Bean("delayQueue")
- public Queue delayQueue(){
- return new Queue("delay.queue",true,false,false);
- }
-
- @Bean
- public Binding bindingDelayExchangeQueue(CustomExchange delayExchange, Queue delayQueue){
- return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.routkey").noargs();
- }
- }
消费者监听消息队列类
- @Component
- public class MessageConsumer {
- /**
- * 延时队列
- * @param channel
- * @param message
- */
- @RabbitListener(queues = "delay.queue")
- public void receiveDelay(Channel channel, Message message){
- System.out.println("receiveDelay:" + message);
- System.out.println("delay:" + message.getMessageProperties());
- System.out.println("message body:"+new String(message.getBody()));
- }
-
- /**
- * 直接接收延时消息
- */
- @RabbitListener(queues = {"delay.queue"})
- public void receiveDelay(String msg){
- System.out.println("delay.queue:"+msg);
- }
- }
提供者:发送类
- @Component
- @Service
- public class MessageProducerServiceImpl implements IMessageProducerService{
- @Resource
- private RabbitTemplate template;
-
-
- /**
- * 发延时队列
- * @param msg
- */
- @Override
- public void sendDelayedMessage(String msg){
- //延时队列 一定要下载最新对应的ez文件放入plugs 文件夹里,版本不同、没下载都无法生效
- template.convertAndSend("delay.exchange","delay.routkey", msg, message -> {
- // 设置过期时间
- message.getMessageProperties().setDelay(5000);
- return message;
- });
- }
这就是延时队列的使用、
往下走—— 死信队列
- //代码创建交换机队列并绑定
- @Configuration
- public class MessageConfig {
-
- /**
- * 直连
- * @return
- */
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("direct",true,true);
- }
-
- @Bean
- public Queue queue11(){
- Map<String,Object> args = new HashMap<>();
- args.put("x-max-length",5); //超过5个的进入死信队列 像我发送100次请求,那么到了q11 6个请求,有一个会进入死信队列,超过了的都得进去
- args.put("x-dead-letter-exchange","deadExchange");
- args.put("x-dead-letter-routing-key","deadRouting");
- return new Queue("q11",true,false,false,args);
- }
-
- @Bean
- public Queue queue12(){
- Map<String,Object> args = new HashMap<>();
- args.put("x-message-ttl",10000); //超过10秒未消费的进入死信队列
- args.put("x-dead-letter-exchange","deadExchange");
- args.put("x-dead-letter-routing-key","deadRouting");
- return new Queue("q12",true,false,false,args);
- }
-
- //死信交换机
- @Bean
- public DirectExchange deadExchange(){
- return new DirectExchange("dead.exchange",true,false);
- }
-
- //死信队列
- @Bean
- public Queue deadQueue(){
- Map<String,Object> args = new HashMap<>();
- args.put("x-dead-letter-exchange","deadExchange");
- args.put("x-dead-letter-routing-key","deadRouting");
- return new Queue("dead.Queue",true,false,false,args);
- }
-
- @Bean
- public Binding bindingExchangeQueue13() {
- return BindingBuilder.bind(deadQueue())
- .to(deadExchange()).with("deadRouting");
- }
-
- @Bean
- public Binding bindingExchangeQueue11(DirectExchange directExchange, Queue queue11) {
- return BindingBuilder.bind(queue11)
- .to(directExchange).with("rk");
- }
-
- @Bean
- public Binding bindingExchangeQueue12(DirectExchange directExchange, Queue queue12) {
- return BindingBuilder.bind(queue12)
- .to(directExchange).with("rk");
- }
-
- }
提供者发送消息:
- @RestController
- public class MessageController {
-
- @Resource
- private IMessageProducerService service;
-
- @Resource
- private RabbitTemplate template;
-
-
- @RequestMapping("/test1")
- public void testMessage1() throws InterruptedException {
- // service.sendMessage1("我发的第一条消息!");
- for (int i=0;i<100;i++){ //有5条多余+10条超时的进入死信队列
- Thread.sleep(100);
- this.service.sendMessage1("直连消息:"+i);
- }
- System.out.println("消息发送完毕");
- }
-
- @RequestMapping("/test2")
- public void testMessage2() {
- Map<String,Object> map = new HashMap<>();
- map.put("sid",1);
- map.put("nums",2);
- this.service.sendMessage2(map);
- System.out.println("消息发送完毕");
- }
-
- @RequestMapping("/test3")
- public void testMessage3() {
- this.service.sendMessage3("我发的topic交换机消息!");
- System.out.println("消息发送完" +
- "毕");
- }
-
-
- @RequestMapping("/test4")
- public void testMessage4() {
- this.service.sendDelayedMessage("我发的延时交换机消息!");
- System.out.println("消息发送完" +
- "毕");
- }
- }
这就是RabbitMq的一个简单的使用,如果发现我有问题,请一定要指出来,我需要进步,我愿意改正
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。