赞
踩
中间件可以理解为一个帮助不同软件、应用或系统之间交流和数据传输的工具或服务。就像一个翻译员在两个讲不同语言的人之间传递信息,让他们能够互相理解和沟通。中间件位于客户端(比如你的电脑或手机应用)和服务器(存放数据和运行服务的强大计算机)之间,确保数据顺利传输,同时还可以提供额外的功能,比如安全性、数据管理和消息服务等。
简单来说,中间件是一种软件服务,用于连接不同的应用程序和系统,帮助它们更好地工作和交流,无论它们是在同一个地方还是分布在全球不同的位置。这就像是建立在不同软件组件之间的桥梁,确保信息能够顺畅、安全地流动。
具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。
1.互操作性和兼容性: 中间件使得不同平台、不同技术和不同语言编写的应用程序能够相互交流和协作。它提供了一种标准的方法来促进不同系统之间的通信,无论这些系统的底层技术如何不同。
2.抽象层: 中间件提供了一个抽象层,隐藏了底层网络和数据通信的复杂性,使得开发者可以专注于应用逻辑的开发,而不需要深入了解底层技术的细节。
3.可扩展性: 中间件设计时考虑到了系统的可扩展性,可以支持从几个到成千上万个并发用户和服务。这使得应用程序可以根据需要轻松扩展,以适应不断增长的用户需求。
4.安全性: 中间件还提供了安全机制,包括认证、授权、加密和数据完整性验证等,以保证数据传输的安全和防止未经授权的访问。
5.高可用性和可靠性: 通过提供故障转移、负载均衡和事务管理等功能,中间件确保了应用程序的高可用性和可靠性。即使在部分系统组件失败的情况下,也能保证服务的连续性和数据的完整性。
6.异步通信和消息队列: 中间件支持异步通信模式,允许应用程序在不直接等待响应的情况下发送和接收消息。这通过使用消息队列等技术实现,提高了应用程序的性能和响应速度。
消息中间件是一种特定类型的中间件,专注于消息的传递和交换。它允许不同的应用程序、系统或软件组件之间通过消息来进行通信,而不是直接调用对方的接口或方法。这种通信方式可以是异步的,也就是说,发送消息的一方不需要等待接收方立即处理消息和回应。消息中间件通常提供了一系列功能,以支持复杂的消息处理模式,包括但不限于消息队列、发布/订阅模型、消息路由和消息持久化。
1.系统解耦:在复杂的应用架构中,各个系统或服务之间的直接依赖会导致维护和扩展变得困难。消息中间件通过提供一个中介层,允许服务之间通过消息进行通信,从而减少了它们之间的直接耦合。这样,即使某个服务发生变化,也不会直接影响到其他服务。
2.异步处理:在处理耗时操作(如发送电子邮件、生成报告等)时,可以将这些任务异步化,即发送一个消息到消息队列,然后继续处理其他任务。后台服务可以从队列中取出消息并处理这些耗时操作,这样可以提高应用的响应速度和用户体验。
3.负载均衡:通过消息中间件,可以将任务分散到多个处理单元上执行,从而实现负载均衡。这对于处理大量并发请求或大数据处理尤为重要,可以有效分配系统资源,避免某个服务因过载而崩溃。
4.数据同步:在分布式系统中,确保数据在不同的服务或数据库之间保持一致性是一项挑战。通过消息中间件,可以实现数据的变更通知和同步,当一个服务更新了数据后,可以通过消息通知其他服务进行相应的更新。
5.事件驱动架构:在事件驱动架构中,系统的行为是由事件触发的。消息中间件允许服务发布事件(消息)到一个共享的事件通道,其他服务可以订阅这些事件并作出响应。这种模式支持高度模块化和动态的系统行为,使得应用能够灵活地响应各种事件。
6.微服务架构:在微服务架构中,应用被分解成许多小型、独立的服务,它们通过网络进行通信。消息中间件是实现服务间通信的理想选择,因为它支持异步消息传递、服务解耦和弹性扩展。
7.大数据处理和实时流处理:在大数据和实时数据流处理场景中,消息中间件(如Apache Kafka)能够处理高吞吐量的数据流。它允许数据被实时捕获、存储、处理和分析,支持复杂的数据处理管道和实时分析应用。
它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。
1.消息队列:消息队列是消息中间件的基本组成部分,它暂时存储在发送者和接收者之间传递的消息。队列确保消息能够按照发送的顺序被逐一处理,即使接收者暂时无法处理消息,也能保证消息不会丢失。
2.交换器(Exchange):在一些消息中间件系统中,特别是使用发布/订阅模式的系统里,交换器负责接收生产者发送的消息,并根据路由规则决定消息应该发送到哪个队列。交换器使得消息分发更加灵活和强大。
3.消息通道:消息通道是连接应用程序和消息中间件的通信路径。它可以是点对点的,也可以是发布/订阅模式下的多对多通信。消息通道抽象了底层的网络通信细节,为消息的发送和接收提供了一个简单的接口。
4.消息代理(Broker):消息代理是消息中间件的中心组件,它管理着消息队列、处理消息的路由、传递和可能的持久化。消息代理还负责处理消息的接收、存储和转发给目标接收者。
5.客户端库:客户端库提供了一套API,使得应用程序能够与消息中间件系统交互,包括发送和接收消息。这些库通常是针对特定编程语言设计的,简化了消息中间件的集成和使用。
6.管理和监控工具:为了确保消息中间件的稳定运行和高效性能,管理和监控工具是必不可少的。这些工具可以监控消息流量、队列长度、处理延迟等指标,并提供配置管理、故障诊断和性能调优的功能。
7.持久化存储:为了保证消息不会因为系统故障而丢失,许多消息中间件支持将消息持久化到磁盘或其他存储系统。这确保了即使在发生故障的情况下,消息也能够被安全地恢复。
RabbitMQ 主要使用的协议是 AMQP(高级消息队列协议,Advanced Message Queuing Protocol)。AMQP 是一个应用层协议,专为异步消息队列提供标准化的通信协议,旨在确保兼容性和可靠性,支持多种消息中间件产品之间的互操作性。
AMQP 定义了消息传递的各种方面,包括消息的排队、路由(包括点对点和发布/订阅模式)、安全性、事务和其他消息服务。RabbitMQ 作为一个遵循 AMQP 标准的消息队列系统,充分利用了这些特性,提供了一个高效、可靠、可扩展的消息中间件解决方案。
除了 AMQP,RabbitMQ 还支持其他的协议,例如:
MQTT(消息队列遥测传输):一个轻量级的消息协议,适用于小型设备和移动应用,常用于物联网(IoT)场景。
STOMP(简单文本导向的消息协议):一种简单的互操作协议,设计用于异步消息传递,支持多种语言和环境。
HTTP和WebSockets:通过插件支持,RabbitMQ 可以接受HTTP和WebSockets协议的消息,使其能够更容易地集成到Web应用中。
这种多协议支持使得 RabbitMQ 能够在多种不同的应用场景中使用,从企业级应用到物联网和实时Web应用,都能提供稳定可靠的消息传递服务。
简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
常见的持久化方式:文件存储和数据库存储
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。
1.默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效
rabbitmq-plugins enable rabbitmq_management
安装完毕以后,重启服务即可
systemctl restart rabbitmq-server
在浏览器访问
http://localhost:15672
账号密码都是guest
新增用户
rabbitmqctl add_user admin admin
设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
2、monitoring 监控者 登录控制台,查看所有信息
3、policymaker 策略制定者 登录控制台,指定策略
4、managment 普通管理员 登录控制台
其他配置
rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Newpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
1.none:
不能访问management plugin,也就是不能显示如下界面
2.management:查看自己相关节点信息
列出自己可以通过AMQP登入的虚拟机
查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
查看和关闭自己的channels和connections
查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3.Policymaker
包含management所有权限
查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4.Monitoring
包含management所有权限
罗列出所有的virtual hosts,包括不能登录的virtual hosts。
查看其他用户的connections和channels信息
查看节点级别的数据如clustering和memory使用情况
查看所有的virtual hosts的全局统计信息。
5.Administrator
最高权限
可以创建和删除virtual hosts
可以查看,创建和删除users
查看创建permisssions
关闭所有用户的connections
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
AMQP生产者流转过程
AMQP消费者流转过程
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
RabbitMQ 是一种消息代理,它支持不同的消息传递模式,这些模式定义了消息如何从发送者传递到接收者。下面是一些通俗易懂的解释和例子:
简单模式(Simple):
工作队列模式(Work Queues):
发布/订阅模式(也叫Fanout模式)(Publish/Subscribe):
路由模式(也叫direct模式)(Routing):
主题模式(Topics):
参数模式(Headers):
RabbitMQ 通过这些模式提供了灵活的消息路由选项,以适应不同的消息传递场景和需求。
生产者
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.queueDeclare("queue1", false, false, false, null); // 6: 准备发送消息的内容 String message = "你好,啊啊啊!!!"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel=connection.createChannel(); channel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("收到消息是" + new String(delivery.getBody(),"utf-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("消息接收失败"); } }); System.out.println("开始接收消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost7"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 String message = "你好!!!"; String exchangeName = "fanout-exchange"; String routingKey = ""; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
import java.io.IOException; public class Consumer { private static Runnable runnable = () -> { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println(queueName + ":开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue-1").start(); new Thread(runnable, "queue-2").start(); new Thread(runnable, "queue-3").start(); } }
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 String message = "你好!!!"; String exchangeName = "direct-exchange"; String routingKey1 = "testkey"; String routingKey2 = "testkey2"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish(exchangeName, routingKey1, null, message.getBytes()); channel.basicPublish(exchangeName, routingKey2, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { private static Runnable runnable = () -> { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println(queueName + ":开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue-1").start(); new Thread(runnable, "queue-2").start(); new Thread(runnable, "queue-3").start(); } }
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 String message = "你好!!!"; String exchangeName = "topic-exchange"; String routingKey1 = "com.course.order";//都可以收到 queue-1 queue-2 String routingKey2 = "com.order.user";//都可以收到 queue-1 queue-3 // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish(exchangeName, routingKey1, null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { private static Runnable runnable = () -> { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println(queueName + ":开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } }; public static void main(String[] args) { // 启动三个线程去执行 new Thread(runnable, "queue-1").start(); new Thread(runnable, "queue-2").start(); new Thread(runnable, "queue-3").start(); } }
以上代码都没有绑定关系,因为我在可视化界面完成了绑定关系,所以我偷了个懒
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 //===============================end topic模式================================== for (int i = 1; i <= 20; i++) { //消息的内容 String msg = "消息:" + i; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, msg.getBytes()); } System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者1
import com.rabbitmq.client.*; import java.io.IOException; public class Work1 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work1"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 // channel.queueDeclare("queue1", false, false, false, null); // 同一时刻,服务器只会推送一条消息给消费者 // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work1-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者2
import com.rabbitmq.client.*; import java.io.IOException; public class Work2 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work2"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, true, false, null); // 同一时刻,服务器只会推送一条消息给消费者 //channel.basicQos(1); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(200); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work2-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 6: 准备发送消息的内容 //===============================end topic模式================================== for (int i = 1; i <= 20; i++) { //消息的内容 String msg = "消息:" + i; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routingkey // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, msg.getBytes()); } System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者1
import com.rabbitmq.client.*; import java.io.IOException; public class Work1 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work1"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 // channel.queueDeclare("queue1", false, false, false, null); // 同一时刻,服务器只会推送一条消息给消费者 // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work1-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消费者2
import com.rabbitmq.client.*; import java.io.IOException; public class Work2 { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者-Work2"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, true, false, null); // 同一时刻,服务器只会推送一条消息给消费者 //channel.basicQos(1); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try{ System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(200); finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }catch(Exception ex){ ex.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("Work2-开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
代码思路如下图
生产者
在pom.xml引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
在application.yml进行配置
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: localhost
port: 5672
定义订单的生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定义交换机 private String exchangeName = "fanout_order_exchange"; // 2: 路由key private String routeKey = ""; public void makeOrder(Long userId, Long productId, int num) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); // 2: 根据商品id productId 去查询商品的库存 // int numstore = productSerivce.getProductNum(productId); // 3:判断库存是否充足 // if(num > numstore ){ return "商品库存不足..."; } // 4: 下单逻辑 // orderService.saveOrder(order); // 5: 下单成功要扣减库存 // 6: 下单完成以后 System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
绑定关系
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("weixin.fanout.queue", true); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange fanoutOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("fanout_order_exchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with(""); } }
消费者
引入的依赖和配置同上
邮件服务
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "email.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class EmailService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("email-------------->" + message); } }
短信服务
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class SMSService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("sms-------------->" + message); } }
微信服务
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class WeixinService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("weixin-------------->" + message); } }
进行测试
package com.rabbitmq.springbootrabbitmqfanoutproducer; import com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringbootRabbitmqFanoutProducerApplicationTests { @Autowired OrderService orderService; @Test public void contextLoads() throws Exception { for (int i = 0; i < 10; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderService.makeOrder(userId, productId, num); } } }
依赖和配置跟上面一样
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定义交换机 private String exchangeName = "direct_order_exchange"; // 2: 路由key private String routeKey = ""; public void makeOrder(Long userId, Long productId, int num) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); // 2: 根据商品id productId 去查询商品的库存 // int numstore = productSerivce.getProductNum(productId); // 3:判断库存是否充足 // if(num > numstore ){ return "商品库存不足..."; } // 4: 下单逻辑 // orderService.saveOrder(order); // 5: 下单成功要扣减库存 // 6: 下单完成以后 System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
绑定关系
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("weixin.fanout.queue", true); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange directOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("direct_order_exchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with(""); } }
消费者
邮件服务
package com.rabbitmq.springbootrabbitmqfanoutconsumer.consumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "email.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class EmailService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("email-------------->" + message); } }
短信服务
package com.rabbitmq.springbootrabbitmqfanoutconsumer.consumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class SMSService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("sms-------------->" + message); } }
微信服务
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class WeixinService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("weixin-------------->" + message); } }
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定义交换机 private String exchangeName = "direct_order_exchange"; // 2: 路由key private String routeKey = ""; public void makeOrder(Long userId, Long productId, int num) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); // 2: 根据商品id productId 去查询商品的库存 // int numstore = productSerivce.getProductNum(productId); // 3:判断库存是否充足 // if(num > numstore ){ return "商品库存不足..."; } // 4: 下单逻辑 // orderService.saveOrder(order); // 5: 下单成功要扣减库存 // 6: 下单完成以后 System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
绑定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("weixin.fanout.queue", true); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange directOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("direct_order_exchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with(""); } }
消费者
邮件服务
package com.rabbitmq.springbootrabbitmqfanoutconsumer.consumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "email.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class EmailService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("email-------------->" + message); } }
短信服务
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class SMSService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("sms-------------->" + message); } }
微信服务
package com.rabbitmq.springbootrabbitmqfanoutconsumer.consumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.FANOUT) )) @Component public class WeixinService { // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值 @RabbitHandler public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("weixin-------------->" + message); } }
测试
import com.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringbootRabbitmqFanoutProducerApplicationTests { @Autowired OrderService orderService; @Test public void contextLoads() throws Exception { for (int i = 0; i < 10; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderService.makeOrder(userId, productId, num); } } }
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import java.util.HashMap; import java.util.Map; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ Map<String,Object> args2 = new HashMap<>(); args2.put("x-message-ttl",5000); channel.queueDeclare("ttl.queue", true, false, false, args2); // 6: 准备发送消息的内容 String message = "你好!!!"; Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x", "1"); headers.put("y", "1"); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 传送方式 .priority(1) .contentEncoding("UTF-8") // 编码方式 .expiration("3000") // 过期时间 .headers(headers).build(); //自定义属性 // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 for (int i = 0; i <100 ; i++) { channel.basicPublish("", "ttl.queue", basicProperties, message.getBytes()); System.out.println("消息发送成功!"); Thread.sleep(1000); } } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
测试
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ // 这里如果queue已经被创建过一次了,可以不需要定义 //channel.queueDeclare("queue1", false, false, false, null); // 6: 定义接受消息的回调 Channel finalChannel = channel; finalChannel.basicConsume("ttl.queue", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties); System.out.println("获取的消息是:" + new String(body,"UTF-8")); } }); System.out.println("开始接受消息"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class MessageTTLProducer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.queueDeclare("ttl.queue2", true, false, false, null); // 6: 准备发送消息的内容 String message = "你好!!!"; Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x", "1"); headers.put("y", "1"); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 传送方式 .priority(1) .contentEncoding("UTF-8") // 编码方式 .expiration("5000") // 过期时间 .headers(headers).build(); //自定义属性 // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 for (int i = 0; i <10 ; i++) { channel.basicPublish("", "ttl.queue2", basicProperties, message.getBytes()); System.out.println("消息发送成功!"); } } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
异步处理:
应用解耦:
负载均衡:
消息缓冲:
跨语言通信:
日志收集:
分布式事务:
RabbitMQ 的这些使用场景展示了它在现代软件架构中的多样性和强大功能,使得它成为系统间通信和集成的重要工具。
死信队列(DLQ,Dead-Letter Queue)在消息队列系统中是一个重要的概念,它用来收集无法被正常消费的消息。在 RabbitMQ 中,死信队列的设置需要通过几个步骤配置,包括指定死信交换机(DLX,Dead-Letter Exchange)和死信路由键。消息可能因为以下几种情况进入死信队列:
basic.reject
或 basic.nack
拒绝消息,并且设置 requeue
参数为 false
,表示不重新排队。TTL
(Time-To-Live)。死信队列提供了一种机制,允许开发者处理那些无法正常处理的消息,而不是简单地丢弃这些消息,从而增加了系统的健壮性和可调试性。利用死信队列,开发者可以:
假设你有一个电子商务系统,其中包含一个处理订单的消息队列。当订单服务从队列接收消息并试图处理订单时,可能会遇到以下问题导致无法处理某些订单消息:
在这种情况下,你可以配置一个死信队列,所有无法处理的订单消息都会被路由到这个队列。然后,你可以定期检查死信队列,分析问题原因,并对这些失败的订单进行手动处理或重试。
在 RabbitMQ 中配置死信队列涉及以下步骤:
通过这种方式,死信队列为系统提供了一种安全网,确保所有消息都可以被追踪和处理,即使它们在第一次处理时失败了。
RabbitMQ 使用信道(Channels)而非直接通过 TCP 连接进行通信的主要原因是效率和资源管理。
TCP连接的成本:建立 TCP 连接涉及三次握手过程,这在频繁创建和销毁连接的场景中是非常昂贵的。而且,操作系统对同时打开的 TCP 连接数量有限制,这可能成为系统的瓶颈。
多路复用:信道允许在一个 TCP 连接上多路复用,即多个信道共享一个 TCP 连接。这样,多个信道可以并行运行在一个连接上,每个信道可以执行不同的任务(如发送消息、接收消息等),而无需为每个信道建立新的 TCP 连接。这大大减少了网络开销和资源消耗。
隔离和安全:信道在逻辑上隔离了不同的消息流。即使多个信道共享同一个 TCP 连接,它们也可以独立工作,互不干扰。这提供了一种有效的方式来隔离不同类型的消息传递活动,并确保它们的独立性。
灵活的资源管理:通过使用信道,RabbitMQ 能够更灵活地管理资源和控制消息流。例如,可以对不同的信道设置不同的消息流控制策略,或者根据需要动态地添加或删除信道,而不影响到现有的 TCP 连接。
简化客户端连接:对于客户端来说,维护一个稳定的 TCP 连接比频繁地打开和关闭连接要简单得多。通过信道,客户端可以在不重新建立 TCP 连接的情况下,动态地增加消息发布或消费的并行度。
总之,信道提供了一种高效、灵活且资源友好的方式来处理大量并发的消息传递活动,这是直接使用 TCP 连接所无法实现的。
在 RabbitMQ 中,队列(Queue)的创建可以由生产者(Producer)、消费者(Consumer)或者两者都进行,这取决于应用程序的设计和需求。关键点在于确保队列在被生产者发送消息或消费者尝试从中读取消息之前已经存在。
生产者创建队列:
消费者创建队列:
两者都可以创建队列:
最佳实践:
最终,谁创建队列取决于你的具体应用场景和架构设计。无论哪种方式,关键是确保在尝试使用队列之前,队列已经被正确创建并配置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。