当前位置:   article > 正文

Spring boot整合RabbitMQ_springboot整合rabbitmq

springboot整合rabbitmq
目录:

1.简介

2.实战:RabbitMQ四种消息交换机类型
    2.1 Direct Exchange
    2.2 Fanout Exchange
    2.3 Topic Exchange
    2.4 Header Exchange
3.总结

一、RabbitMQ简介

  RabbitMQ安装不必我多说,linux和windows环境下都可以安装,我是在windows环境下进行安装的,安装RabbitMQ之前需要先安装Erlang,因为Rabbit MQ 是建立在Erlang OTP平台上,而且安装Erlang 时要注意安装的RabbityMQ 所依赖的Erlang版本。
  RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而且支持跨平台。
  下图主要介绍了消息的推送和接收的过程:
在这里插入图片描述
生产者:创建需要发送的消息
queues:存储消息的队列
交换机:将消息转发到队列上
消费者:以监听的方式获取对应的消息

二、RabbitMQ的四种交换机类型实现

搭建环境:

(1)创建spring boot项目,引入web和rabbitmq依赖。
在这里插入图片描述
pom.xml如下

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

(2)在application.properties文件中配置rabbitMq的基本信息,数据传输的默认端口号是5672

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
  • 1
  • 2
  • 3
  • 4
2.1 direct交换机的实现

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

创建基于direct交换机的rabbitmq配置类。

@Configuration
public class RabbitDirectConfig {
    public final static String DIRECTNAME = "direct";

    //注入队列
    @Bean
    Queue queue() {
        return new Queue("hello.direct");
    }

    //注入交换机
    @Bean
    DirectExchange directExchange() {
        //durable:重启后是否有效 autodelete:长期未使用是否删除掉
        return new DirectExchange(DIRECTNAME, true, false);
    }

    //将队列和交换机进行绑定
    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

创建receiver类接收消息

@Component
public class DirectReceiver {

    @RabbitListener(queues = "hello.direct")
    public void handler1(String msg){
        System.out.println("handler1====="+msg);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

编写测试类

//注入Rabbitmq发送消息的模板对象
	@Autowired
	RabbitTemplate rabbitTemplate;
	@Test
	void contextLoads() {
		//routingKey需要转发到对应队列的名字 object 消息对象
		rabbitTemplate.convertAndSend("hello.direct", "hello fff!haa");
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

启动springboot项目,然后运行测试类进行消息的发送,receiver会将收到的消息打印在控制台
在这里插入图片描述

2.2 Fanout交换机的实现

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

创建基于Fanout交换机的rabbitmq配置类。

@Configuration
public class RabbitFanoutConfig {
    //创建两个队列并绑定到fanout交换机上
    public static final String FANOUTNAME = "fanout";
    @Bean
    Queue queueOne() {
        return new Queue("queue-one");
    }

    @Bean
    Queue queueTwo() {
        return new Queue("queue-two");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUTNAME, true, false);
    }

    @Bean
    Binding bindingOne() {
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }
    @Bean
    Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

创建receiver类接收消息

@Component
public class FanoutReceiver {
    @RabbitListener(queues = "queue-one")
    public void handler1(String msg) {
        System.out.println("FanoutReceiver:handler1:" + msg);
    }

    @RabbitListener(queues = "queue-two")
    public void handler2(String msg) {
        System.out.println("FanoutReceiver:handler2:" + msg);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

编写测试类,发送一条消息,fanoutexchange会将此消息转发给绑定到它身上的所有队列

@Test
	public void test1() {
		rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout!");
	}
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

2.3 Topic交换机的实现

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子

队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

创建基于Topic交换机的rabbitmq配置类。

@Configuration
public class RabbitTopicConfig {
	//创建三个队列,绑定交换机时可以规定通配规则
    public static final String TOPICNAME = "topic";

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(TOPICNAME, true, false);
    }

    @Bean
    Queue xiaomi() {
        return new Queue("xiaomi");
    }

    @Bean
    Queue huawei() {
        return new Queue("huawei");
    }

    @Bean
    Queue phone() {
        return new Queue("phone");
    }

    @Bean
    Binding xiaomiBinding() {
        return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
    }

    @Bean
    Binding huaweiBinding() {
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }
    @Bean
    Binding phoneBinding() {
        return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

创建receiver类接收消息

@Component
public class TopicReceiver {
    @RabbitListener(queues = "xiaomi")
    public void handler1(String msg) {
        System.out.println("TopicReceiver:handler1:" + msg);
    }

    @RabbitListener(queues = "huawei")
    public void handler2(String msg) {
        System.out.println("TopicReceiver:handler2:" + msg);
    }

    @RabbitListener(queues = "phone")
    public void handler3(String msg) {
        System.out.println("TopicReceiver:handler3:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

编写测试类

@Test
	public void test2() {
	    //匹配到xiaomi的消息队列
		rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news", "小米新闻");
		//匹配到phone的消息队列
		rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "vivo.phone", "vivo 手机");
		//匹配到huawei和phone的消息队列
		rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.phone", "华为手机");
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

前3条信息打印在项目的控制台,后一条信息打印在测试的控制台,共有4条信息被打印
在这里插入图片描述
在这里插入图片描述

2.4 Header交换机的实现

header交换机取消了routingkey,使用header中的key/value来匹配队列

创建基于Header交换机的rabbitmq配置类。

@Configuration
public class RabbitHeaderConfig {
    public static final String HEADERNAME = "header";

    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERNAME, true, false);
    }

    @Bean
    Queue queueName() {
        return new Queue("name-queue");
    }

    @Bean
    Queue queueAge() {
        return new Queue("age-queue");
    }

    @Bean
    Binding bindingName() {
        Map<String, Object> map = new HashMap<>();
        map.put("name", "李白");
        //whereAny 只要匹配到map中任意一个key-value即可 ,扩展:whereAll必须具有map中所有的key-value
        return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
    }

    @Bean
    Binding bindingAge() {
        //where 只要age这个key存在即可,value不论取何值
        return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

创建receiver类接收消息

@Component
public class HeaderReceiver {
    @RabbitListener(queues = "name-queue")
    public void handler1(byte[] msg) {
        System.out.println("HeaderReceiver:handler1:" + new String(msg, 0, msg.length));
    }

    @RabbitListener(queues = "age-queue")
    public void handler2(byte[] msg) {
        System.out.println("HeaderReceiver:handler2:" + new String(msg, 0, msg.length));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

编写测试类

	@Test
	public void test3() {
	    //创建org.springframework.amqp.core.Message,并添加消息的头部
		Message nameMsg = MessageBuilder.withBody("hello javaboy !".getBytes()).setHeader("name","李白").build();
		Message ageMsg = MessageBuilder.withBody("hello 99 !".getBytes()).setHeader("age","99").build();
		rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
		rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述
至此,rabbitMq四种交换机类型全部介绍完了。

三、总结:

springboot整合rabbitMq也算入了个门,但是rabbitMq的应用场景比如异步、解耦等还需要在以后的项目开发中探索!!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/932002
推荐阅读
相关标签
  

闽ICP备14008679号