当前位置:   article > 正文

rabbitmq 工作模式(未完全版只demo)

rabbitmq 工作模式(未完全版只demo)

1,导入依赖

添加链接描述

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

2, 工作模式

配置15672是访问web的端口
应该是5672才是访问mq发消息的端口. linux 防火墙需要开放端口 并重启防火墙
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
1、启动服务:systemctl start firewalld.service
2、关闭服务:systemctl stop firewalld.service
3、重启服务:systemctl restart firewalld.service
4、显示服务的状态:systemctl status firewalld.service
5、开机自动启动:systemctl enable firewalld.service
6、禁用开机自动启动:systemctl disable firewalld.service
7、查看版本: firewall-cmd --version
8、查看帮助: firewall-cmd --help
9、显示状态: firewall-cmd --state
10、查看所有打开的端口: firewall-cmd --zone=public --list-ports
11、更新防火墙规则: firewall-cmd --reload
12、查看区域信息: firewall-cmd --get-active-zones
13、查看指定接口所属区域: firewall-cmd --get-zone-of-interface=eth0
14、拒绝所有包:firewall-cmd --panic-on
15、取消拒绝状态: firewall-cmd --panic-off
16、查看是否拒绝: firewall-cmd --query-panic

yml 文件

server:
  port: 8081
spring:
  rabbitmq:
    host: 192.168.126.128
    port: 5672
    username: root
    password: root
    virtual-host: /
    listener:
      direct:
        prefetch: 1   #预抓取一条
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

简单模式

调用

package com.example.demo.work;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestSend {
    //rabbitmq跟springboot整合,springboot提供了模板给我们使用。
    //例如:restTemplate redisTemplate thymeleafTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    //1.工作模式
@GetMapping("testa")
    public void testSendWork() {
        //使用convertAndSend
        //1.当前队列的名称。2.你要携带的信息内容
    try {
        rabbitTemplate.convertAndSend("workqueue", "测试1!!");
    }catch (Exception e){
        e.printStackTrace();
    }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

接收

package com.example.demo.work;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListen {
    @RabbitListener(queues = "workqueue")
    public void workQueue(String str) {
        System.out.println("当前监听到了:" + str);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

工作模式

调用

package com.example.demo.work2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Send1 {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/send1")
    public void  send(){
        for (int i = 0; i < 8; i++) {
            rabbitTemplate.convertAndSend("send1",i);
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

接收

yml 文件 设置 prefetch: 1 合理分发

package com.example.demo.work2;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Recv1s {
    @RabbitListener(queues = "send1")
    public void send1(String s) throws InterruptedException {
        Thread.sleep(Long.parseLong("1000"));
        System.out.println("当前1    "+s);

    }
    @RabbitListener(queues = "send1")
    public void send2(String s){
        System.out.println("当前2    "+s);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Publish(广播模式)

调用

package com.example.demo.Publish;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class publish {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/ces")
    public void testSendPublish(){

        //1.交换机的名称  2.你的规则,发布订阅模式为空 3.消息的主题
        try {
            rabbitTemplate.convertAndSend("logs","","work message");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

接收

package com.example.demo.Publish;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// d定义一个 log交换机    类型为 fanout
/*@Queue注解为我们提供了队列相关的一些属性,具体如下:
name: 队列的名称;
durable: 是否持久化;
exclusive: 是否独享、排外的;
autoDelete: 是否自动删除;
arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:*/

@Component
public class PublishListen {
    @RabbitListener(bindings = {
            @QueueBinding(exchange = @Exchange(value = "logs",type = "fanout"),//绑定交换机 类型为fanout
                    value=@Queue // 创建临时队列

            )
    })
    public void receive(String message){
        System.out.println("receive1 =" +message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(exchange = @Exchange(value = "logs",type = "fanout"),//绑定交换机 类型为fanout
                    value=@Queue // 创建临时队列
            )
    })
    public void receive2(String message){
        System.out.println("receive2 =" +message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

route(路由模式)

创建

package com.example.demo.cesdir;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
@RestController
public class Publish1{
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/ces11")
    public void testSendPublish(){

        //1.交换机的名称  2.你的规则,发布订阅模式为空 3.消息的主题
        for (int i = 0; i <6 ; i++) {

   String a="a"+i;
        try {
            rabbitTemplate.convertAndSend("cesdir",a,a);
        }catch (Exception e){
            e.printStackTrace();
        }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

接收

package com.example.demo.cesdir;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PublishListen1 {
    @RabbitListener(bindings = @QueueBinding(
            key = {"a1","a2"},
            exchange=@Exchange(value = "cesdir",type = "direct"),
            value = @Queue
    ))
    public  void  ces1(String a){
        System.out.println(" a1 a2 :  "+a);
    }


  @RabbitListener(bindings = @QueueBinding(
          value = @Queue,
          exchange = @Exchange(value = "cesdir",type = "direct"),
          key = {"a3","a4"}
  ))
  public  void  ces2(String a){
      System.out.println(" a3 a4 :   "+a);
  }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "cesdir",type = "direct"),
            key = {"a4","a5","a3"}
    ))
    public  void  ces3(String a){
        System.out.println(" a3 a4 a5 :   "+a);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

Topics(主题模式)

创建 主题模式和路由模式区别为key 绑定 路由模式key={“key1”,“key2”} 主题模式key={".key.#"} * # , #”表示匹配一个或多个词,符号“”表示匹配一个词。

package com.example.demo.cesTip;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Publish2 {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("ces222")
    public void  ces222(){
        for (int i = 0; i < 6; i++) {
            String a1="ces.a."+i+"c";
            System.out.println("a1   "+a1);
           rabbitTemplate.convertAndSend("Topics",a1,a1);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

接收

package com.example.demo.cesTip;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class PublishListen2 {

    @RabbitListener( bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(value = "Topics", type = "topic"),
            key = {"ces.*.*"}
    )
    )
    public void receive1(String message){
        System.out.println("receive 1 = " + message);
    }
    @RabbitListener( bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(value = "Topics", type = "topic"),
            key = {"ces.#"}
    )
    )
    public void receive2(String message){
        System.out.println("receive 2 = " + message);
    }


    @RabbitListener( bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(value = "Topics", type = "topic"),
            key = {"#.a.*"}
    )
    )
    public void receive3(String message){
        System.out.println("receive 3 = " + message);
    }


}

 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/388352
推荐阅读
相关标签
  

闽ICP备14008679号