赞
踩
异步调用中用Broker进行事件订阅和调用,完成解耦
没有强依赖,不用担心级联失败
流量削峰
1.可以使用命令拉取镜像
docker pull rabbitmq:3-management
2.也可以直接去官网下载tar包,然后上传到虚拟机上面
只需要简单的引入amqp依赖,
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
然后配置接收和发送端的地址
- spring:
- rabbitmq:
- host: 192.168.xxx.xxx # 自己主机名
- port: 5672 # 端口
- virtual-host: / # 虚拟主机
- username: xxxxxx # 用户名
- password: xxxxxx # 密码
然后调用方法发送或结合搜消息即可
发送:
- package cn.itcast.mq.spring;
-
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SpringAmqpTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSimpleQueue() {
- // 队列名称
- String queueName = "simple.queue";
- // 消息
- String message = "hello, spring amqp!";
- // 发送消息
- rabbitTemplate.convertAndSend(queueName, message);
- }
- }
接收:
- package cn.itcast.mq.listener;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueueMessage(String msg) throws InterruptedException {
- System.out.println("spring 消费者接收到消息:【" + msg + "】");
- }
- }
当然了,如果不是在父工程里面配置的依赖则需要在单个项目里面单独配置
假设编辑五百条消息:
- /**
- * workQueue
- * 向队列中不停发送消息,模拟消息堆积。
- */
- @Test
- public void testWorkQueue() throws InterruptedException {
- // 队列名称
- String queueName = "simple.queue";
- // 消息
- String message = "hello, message_";
- for (int i = 0; i < 500; i++) {
- // 发送消息
- rabbitTemplate.convertAndSend(queueName, message + i);
- Thread.sleep(20);
- }
- }
定义两个接受者用不同效率接收:
- @RabbitListener(queues = "simple.queue")
- public void listenWorkQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "simple.queue")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
- Thread.sleep(200);
- }
然后可以看出接受者接收消息并未在预定时间被消费,原因是被队列平均分配了,只要重新定制规则即可:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
创建FanoutExchange交换机和Queue队列,然后交换机和队列绑定:
- package cn.itcast.mq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanoutConfig {
- /**
- * 声明交换机
- * @return Fanout类型交换机
- */
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("itcast.fanout");
- }
-
- /**
- * 第1个队列
- */
- @Bean
- public Queue fanoutQueue1(){
- return new Queue("fanout.queue1");
- }
-
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
- }
-
- /**
- * 第2个队列
- */
- @Bean
- public Queue fanoutQueue2(){
- return new Queue("fanout.queue2");
- }
-
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
- }
- }
绑定完成后就可以写消费者和生产者代码了
生产者
- @Test
- public void testFanoutExchange() {
- // 队列名称
- String exchangeName = "itcast.fanout";
- // 消息
- String message = "hello, everyone!";
- rabbitTemplate.convertAndSend(exchangeName, "", message);
- }
消费者
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String msg) {
- System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String msg) {
- System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
- }
交换机的作用是什么?
* 接收publisher发送的消息
* 将消息按照规则路由到与之绑定的队列
* 不能缓存消息,路由失败,消息丢失
* FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
* Queue
* FanoutExchange
* Binding
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。