赞
踩
中间件是分布式网络环境中,将不同技术架构的服务相互连接起来处理业务,实现了多种标准的协议接口,可以对接不同语言技术开发的服务,达到系统的解耦合
一个业务处理需要多个服务系统相互协同处理解决,这些服务不是在一个系统中,而是分开的多个相互独立的系统;
好处:
问题:
消息中间件:
优点:
Advanced Message Queuing Protocol 高级消息队列协议
提供统一的一套标准的消息队列协议,是应用层的一个开放标准,面向消息中间件;
基于此协议可以对接不同技术语言的服务
支持分布式事务
支持消息的持久化
处理消息的高性能和高可靠
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eXclIJXE-1662295572445)(\p\RabbitMQ工作模型.png)]
docker 进入容器内部
docker exec -it b1da550086cc /bin/bash
设置用户密码
rabbitmqctl add_user admin admin
设置用户管理员权限
rabbitmqctl set_user_tags admin administrator
退出容器不关闭容器
按住Ctrl+P+Q
Producer生产者:生产消息的一方
Consumer消费者:消费消息的一方
消息并不是直接投递到消息队列中,而是有交换机分配消息到队列中,有四种类型direct(默认),fanout, topic, 和 headers
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IhSGUH6h-1662295572447)(\p\24007899.jpg)]
生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YgPDl9v4-1662295572448)(p\70553134.jpg)]
生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0XMCXu96-1662295572449)(\p\67952922.jpg)]
把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中
如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为"Info”或者"debug”,消息只会路由到Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XUY71D3y-1662295572450)(p\37008021.jpg)]
direct类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xe9gU1yL-1662295572450)(\p\73843.jpg)]
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在
一个生产者 -> 一个队列 -> 一消费者
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zdAZ4nNp-1662295572451)(p\工作模式-simple.png)]
public class Producer { public static void main(String[] args) { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 String queueName = "queue1"; /** * 队列名字 * 是否要持久化 * 排他性,是否是个独占队列 * 是否自动删除,最后一个消费者消费完后是否删除队列 * 携带的附属参数 */ channel.queueDeclare(queueName, false, false, false, null); //准备消息内容 String message = "hello"; //发送消息给队列 channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } } public class Customer { public static void main(String[] args) { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 channel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败..."); } }); return; } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
为什么RabbitMQ是基于通道Channel去处理而不是基于连接Connection?
TCP的连接断开需要消耗很多资源开销,影响性能,
于是在使用Connection长连接基础上,创建多个信道channel处理消息,性能更高,
信道之间相互独立隔离;
可以存在没有交换机的队列吗?
不可能的,虽然没有指定具体的交换机,但是会有个默认的交换机;
轮询模式 : 一个消费者一条,按均分配
自动应答
不会因为服务器处理速度慢而少分配发送消息,还是一样的均分
公平分发模式 : 根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
要手动应答
相关应用场景:邮件群发,群聊,广播
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1y5IzzU3-1662295572452)(\p\1913282-20220730231211734-1176611840.png)]
package com.ung.rabbit.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ung * @Description: 生产者 */ public class Producer { public static void main(String[] args) { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 //创建交换机 String exchangeName = "fanout_exchange"; //类型 String type ="fanout"; // 3.声明交换机对象 channel.exchangeDeclare(exchangeName, type); //路由键 String routekey =""; String queueName1 = "queue1"; String queueName2 = "queue2"; String queueName3 = "queue3"; /** * 队列名字 * 是否要持久化 * 排他性,是否是个独占队列 * 是否自动删除,最后一个消费者消费完后是否删除队列 * 携带的附属参数 * * * 创建队列 */ channel.queueDeclare(queueName1, false, false, false, null); channel.queueDeclare(queueName2, false, false, false, null); channel.queueDeclare(queueName3, false, false, false, null); // 5.绑定到交互机 // fanout_exchange 绑定了 3个队列 channel.queueBind(queueName1, exchangeName,routekey);//指定交换机 channel.queueBind( queueName2, exchangeName, routekey); channel.queueBind( queueName3, exchangeName, routekey); //准备消息内容 String message = "hello fanout"; //发送消息给队列 channel.basicPublish(exchangeName, routekey, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.ung.rabbit.routing; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ung * @Description: 消费者 */ public class Customer { public static Runnable runnable = new Runnable() { @Override public void run() { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 String queueName = Thread.currentThread().getName(); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName+"收到消息是: " + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败..."); } }); return; } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue1").start(); new Thread(runnable,"queue2").start(); new Thread(runnable,"queue3").start(); } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WXPGX5yZ-1662295572453)(p\1913282-20220730231303785-1294379445.png)]
package com.ung.rabbit.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ung * @Description: 生产者 */ public class Producer { public static void main(String[] args) { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 //创建交换机 String exchangeName = "direct_exchange"; //类型 String type ="direct"; // 3.声明交换机对象 channel.exchangeDeclare(exchangeName, type); //路由键 String routekey1 ="k1"; String routekey2 ="k2"; String routekey3 ="k3"; String queueName1 = "queue1"; String queueName2 = "queue2"; String queueName3 = "queue3"; /** * 队列名字 * 是否要持久化 * 排他性,是否是个独占队列 * 是否自动删除,最后一个消费者消费完后是否删除队列 * 携带的附属参数 * * * 创建队列 */ channel.queueDeclare(queueName1, false, false, false, null); channel.queueDeclare(queueName2, false, false, false, null); channel.queueDeclare(queueName3, false, false, false, null); // 5.绑定到交互机 // fanout_exchange 绑定了 3个队列 channel.queueBind(queueName1, exchangeName,routekey1);//指定交换机 channel.queueBind( queueName2, exchangeName, routekey2); channel.queueBind( queueName3, exchangeName, routekey3); //准备消息内容 String message1 = "hello direct1"; String message2 = "hello direct2"; String message3 = "hello direct3"; //发送消息给队列 channel.basicPublish(exchangeName, routekey1, null, message1.getBytes()); channel.basicPublish(exchangeName, routekey2, null, message2.getBytes()); channel.basicPublish(exchangeName, routekey3, null, message3.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.ung.rabbit.routing; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ung * @Description: 消费者 */ public class Customer { public static Runnable runnable = new Runnable() { @Override public void run() { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 String queueName = Thread.currentThread().getName(); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息是: "+queueName + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败..."); } }); return; } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue1").start(); new Thread(runnable,"queue2").start(); new Thread(runnable,"queue3").start(); } }
一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。
此时的自己唯一的Routekey,不是一个确定值,像我们熟悉的正则表达式对应的匹配规则。
生产者产生消息,把消息交给交换机,交换机根据RouteKey的模糊匹配到对应的队列,由队列监听消费者消费消息。
命名规则是多个单词用顿号(.)分隔开代表代表一个单词,*代表多个单词,#代表0个或多个
package com.ung.rabbit.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author ung * @Description: 生产者 */ public class Producer { public static void main(String[] args) { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 //创建交换机 String exchangeName = "topic_exchange"; //类型 String type ="topic"; // 3.声明交换机对象 channel.exchangeDeclare(exchangeName, type); String queueName1 = "queue1"; String queueName2 = "queue2"; String queueName3 = "queue3"; /** * 队列名字 * 是否要持久化 * 排他性,是否是个独占队列 * 是否自动删除,最后一个消费者消费完后是否删除队列 * 携带的附属参数 * * * 创建队列 */ channel.queueDeclare(queueName1, false, false, false, null); channel.queueDeclare(queueName2, false, false, false, null); channel.queueDeclare(queueName3, false, false, false, null); //路由键 String routekey1 ="k.*"; String routekey2 ="k.#"; String routekey3 ="k.k"; // 5.绑定到交互机 // fanout_exchange 绑定了 3个队列 channel.queueBind(queueName1, exchangeName,routekey1);//指定交换机 channel.queueBind( queueName2, exchangeName, routekey2); channel.queueBind( queueName3, exchangeName, routekey3); //准备消息内容 String message1 = "hello topic1"; String message2 = "hello topic2"; String message3 = "hello topic3"; //发送消息给队列 channel.basicPublish(exchangeName, "k.k", null, message1.getBytes()); // channel.basicPublish(exchangeName, routekey2, null, message2.getBytes()); // channel.basicPublish(exchangeName, routekey3, null, message3.getBytes()); System.out.println("消息发送成功!"); } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.ung.rabbit.routing; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author ung * @Description: 消费者 */ public class Customer { public static Runnable runnable = new Runnable() { @Override public void run() { //创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.10.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接Connection connection = connectionFactory.newConnection("生产者"); //获取通道Channel channel = connection.createChannel(); //创建交换机,申明队列,绑定关系,路由key,发送消息,接收消息 String queueName = Thread.currentThread().getName(); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息是: "+queueName + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败..."); } }); return; } catch (Exception e) { e.printStackTrace(); } finally { //关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue1").start(); new Thread(runnable,"queue2").start(); new Thread(runnable,"queue3").start(); } }
新建一个springboot项目作为生产者
先导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
yml配置文件配置
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.10.100
port: 5672
配置config
声明交换机,队列,绑定交换机和队列
package com.ung.rabbitmq_springboot.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author ung * @Description: */ @Configuration public class RabbitMQConfig { //声明交换机 fanout模式 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_order_exchange", true, false); } //声明队列 sms.fanout.queue 、email.fanout.queue 、shortMessage.fanout.queue @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue emailQueue() { return new Queue("email.fanout.queue", true); } @Bean public Queue shortMessageQueue() { return new Queue("shortMessage.fanout.queue", true); } //交换机和队列进行绑定 @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding shortMessageBinding() { return BindingBuilder.bind(shortMessageQueue()).to(fanoutExchange()); } }
service发送消息
package com.ung.rabbitmq_springboot; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; /** * @Author ung * @Description: */ @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void makeOrder(String userId,String productId, int num){ //根据商品id查库存 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:"+orderId); //保存订单 //分发消息 /** * 1、交换机 * 2、路由键/队列名 * 3、消息 */ String exchangeName ="fanout_order_exchange"; String routingKey =""; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); } }
在springboot测试类中启动
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder("1","1",1);
}
}
消息发送成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AtlBFVrd-1662295572454)(\p\image-20220901202410228.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-toEvaZDc-1662295572455)(p\image-20220901202604981.png)]
新建另一个spring boot消费者
一样的依赖和配置文件
创建消费者类,监听队列,注意要注入到spring容器
package com.ung.springboot_rabbitmq_client.service.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ @RabbitListener(queues = {"email.fanout.queue"}) @Service public class FanoutEmailConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到email:"+message); } }
package com.ung.springboot_rabbitmq_client.service.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ @RabbitListener(queues = {"shortMessage.fanout.queue"}) @Service public class FanoutShortMessageConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到shortMessage:"+message); } }
package com.ung.springboot_rabbitmq_client.service.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ @RabbitListener(queues = {"sms.fanout.queue"}) @Service public class FanoutSMSConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到sms:"+message); } }
消费消息成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KqHk08Du-1662295572455)(p\image-20220901202651330.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vdmd1f35-1662295572456)(p\image-20220901202711468.png)]
config配置
package com.ung.rabbitmq_springboot.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author ung * @Description: */ @Configuration public class RabbitMQDirectConfig { //声明交换机 direct模式 @Bean public DirectExchange directExchange() { return new DirectExchange("direct_order_exchange", true, false); } //声明队列 sms.direct.queue 、email.direct.queue 、shortMessage.direct.queue @Bean public Queue smsQueue() { return new Queue("sms.direct.queue", true); } @Bean public Queue emailQueue() { return new Queue("email.direct.queue", true); } @Bean public Queue shortMessageQueue() { return new Queue("shortMessage.direct.queue", true); } //交换机和队列进行绑定 @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms"); } @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email"); } @Bean public Binding shortMessageBinding() { return BindingBuilder.bind(shortMessageQueue()).to(directExchange()).with("shortMessage"); } }
service 里发送消息
package com.ung.rabbitmq_springboot; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; /** * @Author ung * @Description: */ @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void makeOrderDirect(String userId, String productId, int num) { //根据商品id查库存 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); //保存订单 //分发消息 /** * 1、交换机 * 2、路由键/队列名 * 3、消息 */ String exchangeName = "direct_order_exchange"; String routingKey = "sms"; rabbitTemplate.convertAndSend(exchangeName, "sms", "sms" + orderId); rabbitTemplate.convertAndSend(exchangeName, "email", "email" + orderId); rabbitTemplate.convertAndSend(exchangeName, "shortMessage", "shortMessage" + orderId); } }
消费者这边
package com.ung.springboot_rabbitmq_client.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ @RabbitListener(queues = {"sms.direct.queue"}) @Service public class DirectSMSConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到sms:"+message); } }
package com.ung.springboot_rabbitmq_client.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ @RabbitListener(queues = {"shortMessage.direct.queue"}) @Service public class DirectShortMessageConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到shortMessage:"+message); } }
package com.ung.springboot_rabbitmq_client.service.direct; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ @RabbitListener(queues = {"email.direct.queue"}) @Service public class DirectEmailConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到email:"+message); } }
使用注解方式
符号“#”匹配路由键的一个或多个词,符号“*”匹配路由键的一个词
消费者
package com.ung.springboot_rabbitmq_client.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ //@RabbitListener(queues = {"email.topic.queue"}) @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC), key = "*.email.*" )) @Service public class TopicEmailConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到email:"+message); } }
package com.ung.springboot_rabbitmq_client.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ //@RabbitListener(queues = {"shortMessage.topic.queue"}) @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "shortMessage.topic.queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC), key = "*.shortMessage.#" )) @Service public class TopicShortMessageConsumer { @RabbitHandler public void receiveMessage(String message){ System.out.println("收到shortMessage:"+message); } }
package com.ung.springboot_rabbitmq_client.service.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Service; /** * @Author ung * @Description: */ //@RabbitListener(queues = {"sms.topic.queue"}) @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC), key = "#.sms.#" )) @Service public class TopicSMSConsumer { @RabbitHandler public void receiveMessage(String message) { System.out.println("收到sms:" + message); } }
生产者
public void makeOrderTopic(String userId, String productId, int num) { //根据商品id查库存 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); //保存订单 //分发消息 /** * 1、交换机 * 2、路由键/队列名 * 3、消息 */ String exchangeName = "direct_order_exchange"; //#.sms.# //*.email.* //*.shortMessage.# String routingKey = "sms"; rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId); }
对消息设置过期时间,在这个时间内都可以被消费者消费,一旦这个时间过去后,就会被自动删除
可以对消息和队列设置TTL
创建交换机和队列,并给队列设置TTL过期时间
package com.ung.rabbitmq_springboot.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author ung * @Description: */ @Configuration public class RabbitMQTTLConfig { //声明交换机 @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl_order_exchange", true, false); } //声明队列 @Bean public Queue ttlDirectQueue() { Map<String,Object> args = new HashMap<>(); args.put("x-message-ttl",5000);//这里一定是int类型 注意是 x-message-ttl的参数 return new Queue("ttl.direct.queue",true,false,false,args);} //绑定队列和交换机 @Bean public Binding ttlDirectBinding() { return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with("ttl"); } }
给交换机发送消息
public void makeOrderTTLQueue(String userId, String productId, int num) {
//根据商品id查库存
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
String exchangeName = "ttl_order_exchange";
String routingKey = "ttl";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
创建ttl队列,有ttl标志 可以看到有一个消息在里面,过5秒后自动删除
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5maFKzEg-1662295572457)(p\image-20220904153451005.png)]
声明普通队列
//声明交换机 @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl_order_exchange", true, false); } //声明一个普通队列 @Bean public Queue directMessageQueue() { return new Queue("ttl.message.direct.queue", true); } //绑定队列和交换机 @Bean public Binding ttlMessageDirectBinding() { return BindingBuilder.bind(directMessageQueue()).to(ttlDirectExchange()).with("ttlMessage"); }
发送消息时指定消息的TTL过期时间
public void makeOrderTTLMessage(String userId, String productId, int num) { //根据商品id查库存 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); String exchangeName = "ttl_order_exchange"; String routingKey = "ttlMessage"; //给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置过期时间5s message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId,messagePostProcessor); }
可以看到创建了一个队列,并有一个消息,5秒后自动删除
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JY74xdZy-1662295572457)(p\image-20220904154500160.png)]
如果队列设置了TTL,消息也设置了TTl,那会使用哪一个TTL过期时间?
哪个TTL时间小,最先过期就使用哪一个
那TTL队列和TTL消息最大的区别,有什么不同?
给队列设置TTL过期时间,相当于给这个队列一个TTL的标记,当队列的消息过期后,
可以进入配置的死信队列中,做下一步处理,
而消息设置过期时间,在普通队列中会自动删除,不会进入到死信队列中
DLX,全称 Dead-Letter-Exchange
也叫死信交换机
当一个消息在队列中变成死信消息后,会被重新发送到死信交换机;绑定DLX死信交换机的队列就是死信队列
那消息如何才能变为死信消息?
队列在代码中之前已经配置好了,再修改的话会报错
创建死信交换机,死信队列,其实就是跟普通交换机,队列一样
package com.ung.rabbitmq_springboot.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author ung * @Description: */ @Configuration public class RabbitMQDeadConfig { //声明交换机 @Bean public DirectExchange deadDirectExchange() { return new DirectExchange("dead_order_exchange", true, false); } //声明一个普通队列 @Bean public Queue deadQueue() { return new Queue("dead.direct.queue", true); } //绑定队列和交换机 @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead"); } }
最关键的设置
在普通队列中设置绑定死信交换机和死信队列路由键
package com.ung.rabbitmq_springboot.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author ung * @Description: */ @Configuration public class RabbitMQTTLConfig { //声明交换机 @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl_order_exchange", true, false); } //声明队列 @Bean public Queue ttlDirectQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000);//这里一定是int类型 注意是 x-message-ttl //设置死信交换机 args.put("x-dead-letter-exchange", "dead_order_exchange");//注意是 x-dead-letter-exchange //设置死信队列路由键 args.put("x-dead-letter-routing-key", "dead");//注意是 x-dead-letter-routing-key return new Queue("ttl.direct.queue", true, false, false, args); } //声明一个普通队列 @Bean public Queue directMessageQueue() { return new Queue("ttl.message.direct.queue", true); } //绑定队列和交换机 @Bean public Binding ttlMessageDirectBinding() { return BindingBuilder.bind(directMessageQueue()).to(ttlDirectExchange()).with("ttlMessage"); } //绑定队列和交换机 @Bean public Binding ttlDirectBinding() { return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with("ttl"); } }
跟过期时间队列一样的发送消息
可以看到死信队列创建,原来的TTL队列有DLX的标志
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f816rwvj-1662295572463)(p\image-20220904160934569.png)]
过5秒后,TTL队列里的消息过期,就会到死信队列里
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iBwJmRvn-1662295572464)(p\image-20220904161008336.png)]
//声明队列
@Bean
public Queue ttlDirectQueue() {
Map<String, Object> args = new HashMap<>();
//设置队列最大长度
args.put("x-max-length", 5);//x-max-length
args.put("x-message-ttl", 5000);//这里一定是int类型 注意是 x-message-ttl
//设置死信交换机
args.put("x-dead-letter-exchange", "dead_order_exchange");//注意是 x-dead-letter-exchange
//设置死信队列路由键
args.put("x-dead-letter-routing-key", "dead");//注意是 x-dead-letter-routing-key
return new Queue("ttl.direct.queue", true, false, false, args);
}
发送11条消息
设置队列最大长度5 后,超出的消息会被发送到死信队列中
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OHKAMrtU-1662295572465)(p\image-20220904161948827.png)]
RabbitMQ的内存大小
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zhhPpvOK-1662295572466)(p\image-20220904163212972.png)]
当出现警告的时候,可以通过配置去修改和调整
参考帮助文档:http://www.rabbbitmq.com/configure.html
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB 代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。
通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效
使用配置文件的方式修改
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ju5Loj3A-1662295572466)(p\image-20220904163456880.png)]
内存换页
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p8xG6j8u-1662295572467)(p\image-20220904164600449.png)]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。