赞
踩
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ是AMQP协议的Erlang的实现。
概念 | 说明 |
---|---|
连接Connection | 一个网络连接,比如TCP/IP套接字连接。 |
信道Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
客户端Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
服务节点Broker | 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 |
端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 |
消费者Consumer | 一个从消息队列里请求消息的客户端程序。 |
生产者Producer | 一个向交换机发布消息的客户端应用程序。 |
首先创建一个空工程,作为父工程。
在父工程下创建一个生产者工程项目mq_simple_publisher
在父工程下创建一个生产者工程项目mq_simple_consumer
往两个项目的pom.xml文件中添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
1️⃣ 创建工程(生产者、消费者)
2️⃣ 添加依赖
3️⃣ 编写生产者发送信息
4️⃣ 编写消费者接收信息
在上图的模型中,有以下概念:
package com.soberw.simple; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author soberw * @Classname Publisher * @Description 生产者:向消息中间件上发送消息数据的应用程序 * @Date 2022-05-31 15:19 */ public class Publisher { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置mq服务器连接信息 //设置虚拟主机名称 默认是 / factory.setVirtualHost("/"); //设置mq服务器连接地址 默认是 localhost factory.setHost("192.168.6.200"); //设置连接用户名 默认为 guest factory.setUsername("soberw"); //设置密码 默认为 guest factory.setPassword("123456"); //设置连接端口 默认为 5672 factory.setPort(5672); //2、创建连接 Connection connection = factory.newConnection(); //3、在连接上创建频道(信道),信道相当于一个逻辑上的连接。 // 为了提高系统性能,不是每次都连接或者关闭connection // 而是每次都打开或者关闭信道 Channel channel = connection.createChannel(); //4、声明(创建)队列 /* * queue 参数1:队列名称 * durable 参数2:是否定义持久化队列,当mq重启之后,是否还在 * exclusive 参数3:是否独占本次连接(是否为排他队列),这个队列是否只限于当前连接使用。如果连接关闭,那么队列自动被删除 * ① 是否独占,只能有一个消费者监听这个队列 * ② 当connection关闭时,是否删除队列 * autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除。队列长时间没有使用,服务会自动删除队列 * arguments 参数5:队列其它参数,队列的一些属性。例如:队列超时时间、队列连接长度等... */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //5、发送信息 String message = "Hello,RabbitMQ!"; /* * 参数1:交换机名称,如果没有指定则使用默认Default Exchange,简单模式下默认为“”即可 * 参数2:路由key,简单模式可以传递队列名称 * 参数3:配置信息,消息属性信息 * 参数4:消息内容 */ AMQP.BasicProperties props = new AMQP.BasicProperties(); //构建之后一定要赋值给一个新的对象,要不然无效 AMQP.BasicProperties build = props.builder().appId("app01").userId("soberw").messageId("msg01").build(); channel.basicPublish("", QUEUE_NAME, build, message.getBytes()); System.out.println("已经发送的信息:" + message); //6、关闭资源 channel.close(); connection.close(); } }
运行程序:http://192.168.6.200:15672
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
package com.soberw.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author soberw * @Classname Consumer * @Description 从消息中间件上获取消息,并处理消息的应用程序 * @Date 2022-05-31 16:18 */ public class Consumer { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接 ConnectionFactory factory = new ConnectionFactory(); //设置mq服务器连接信息 //设置虚拟主机名称 默认是 / factory.setVirtualHost("/"); //设置mq服务器连接地址 默认是 localhost factory.setHost("192.168.6.200"); //设置连接用户名 默认为 guest factory.setUsername("soberw"); //设置密码 默认为 guest factory.setPassword("123456"); //设置连接端口 默认为 5672 factory.setPort(5672); //2、创建连接 Connection connection = factory.newConnection(); //3、在连接上创建频道(信道) Channel channel = connection.createChannel(); //4、声明(创建)队列(可以省略) // 原则上消费者是可以省略不写的,但是考虑到容错性的问题,建议写上 // 因为如果生产者还没有进入队列的时候,如果消费者先进入队列,则会报错 // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //5、获取并处理消息 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 回调方法,当收到消息后,会自动执行该方法 * @param consumerTag 标识 * @param envelope 获取一些信息,交换机,路由key。。。 * @param properties 配置信息 * @param body 数据体 * @throws IOException io异常 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag = " + consumerTag); System.out.println("DeliveryTag = " + envelope.getDeliveryTag()); //消息唯一标识。类似于ID System.out.println("Exchange = " + envelope.getExchange()); //获取消息是从那个交换机过来的 System.out.println("RoutingKey = " + envelope.getRoutingKey()); //获取队列绑定交换机的路由 key System.out.println("properties = " + properties); System.out.println("body = " + new String(body)); //下面就可以进行一些操作,例如:将信息保存到数据库,给用户发消息,发短息,记录日志等 } }; /* * 消费者类似一个监听程序,主要是用来监听消息 * 参数: * 1、queue: 队列名称 * 2、autoAck:是否自动确认,类似于发短息的时候,发送成功手机会收到一个确认信息 * 3、callback:回调对象 */ channel.basicConsume(QUEUE_NAME, true, consumer); //6、关闭资源 // channel.close(); // connection.close(); } }
取出消息并显示,注意此时并未关闭连接,表示消费者亦然处于开启状态,此时如果生产者再次发送消息,依然可以接受到:
当消息被从消息队列
中取出后,不管连接是否关闭,消息队列中对应的消息就不存在了:
在上面案例中:
由于下面对每个模式的测试都大致要遵循此流程,只是涉及到具体的操作方法会有所不同,因此这里可以对建立连接进行一个抽取,抽取出来一个工具类:
package com.soberw.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author soberw * @Classname ConnectionUtil * @Description 连接抽取出来的工具类 * @Date 2022-05-31 20:45 */ public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("192.168.6.200"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("soberw"); factory.setPassword("123456"); // 通过工程获取连接 return factory.newConnection(); } //测试一下 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = getConnection(); //connection = amqp://soberw@192.168.6.200:5672/ System.out.println("connection = " + connection); } }
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
因为是多个消费者共同消费同一个生产者的消息,因此创建多一个消费者。
package com.soberw.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.soberw.util.ConnectionUtil; /** * @author soberw * @Classname Publisher * @Description work消息模型:多个消费者共同消费一个队列的消息。目的:提高消息处理速度 * @Date 2022-05-31 20:57 */ public class Publisher { /** * 模拟生产者生产速度快 */ static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }
按照上面的入门案例,编写两个消费者代码:
package com.soberw.work; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname consumer1 * @Description * @Date 2022-05-31 21:29 */ public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body1:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
package com.soberw.work; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname Consumer2 * @Description * @Date 2022-05-31 21:30 */ public class Consumer2 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body2:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
先开启两个消费者程序,然后运行生产者程序:
看似是公平竞争的关系?
两个消费者都抢占到了5个消息。
下面我们设置一些不公平的场景,通过给两个消费者添加不同的休眠时间来设置。
让消费者1在获取到消息后,休眠1秒钟;让消费者2在获取到消息后,休眠2秒钟。
package com.soberw.work; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname consumer1 * @Description * @Date 2022-05-31 21:29 */ public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body1:" + new String(body)); //添加休眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
代码同消费者1,将休眠时间改为2秒。
发现此时依然是公平的竞争关系,虽然设置了不同的消息处理时间,但是最后不同消费者还是会拿到同等的消息。
上面的消息处理方式都是设置的自动确认
:
这里需要改为手动确认
。
因为是手动确认,所以需要我们在程序中添加确认接收
的代码,即只有将消息处理无误后,才确认。
package com.soberw.work; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname consumer1 * @Description * @Date 2022-05-31 21:29 */ public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body1:" + new String(body)); //添加休眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //在获取到消息后,可以进行一些操作,例如数据库操作、发送邮件、记录日志等... /* * 确认消息一般放在最后,等程序执行完无异常后,在进行确认消息 * * 第一个参数:添加手动确认消息的唯一标识 * 第二个参数:确认方式。是否一次抓取并确认多个消息 * true:设置为true,即表示可以批量确认消息,这样可以挺高执行速度,当消息量大时,避免繁琐的处理, * 但是队列会一次性将多个消息同时从队列中删除,也会导致系统的不稳定性,存在安全隐患 * false:设置为false,即表示无论多少个消息,都需要一次一次的确认,此设置会可能影响处理的性能 * 实际中可根据不同场景进行设置... */ channel.basicAck(envelope.getDeliveryTag(), false); //手动确认,业务逻辑没有问题后才确认 } }; //初始的自动确认 //channel.basicConsume(QUEUE_NAME, true, consumer); //改为手动确认 channel.basicConsume(QUEUE_NAME, false, consumer); } }
与消费者1同理,改为手动确认。
发现还是公平竞争的关系。
但这显然不符合所想,我们预期的结果是,处理消息快的消费者,应当分配多一点的消息,即按劳分配,能者多劳。
如果采用上面的默认消息分发策略,消息是轮询发送的。但是消费者之间存在处理快慢问题,如果A处理慢,B处理快,他们接受同样数量的消息显然是不合理的。就是在这样情况下,不公平分发出现了,简而言之就是能者多劳,处理快的多处理,处理慢的少处理。
如何实现不公平分发?
这里涉及到一个概念:
当网络发生拥塞的时候,所有的数据流都有可能被丢弃;为满足用户对不同应用不同服务质量的要求,就需要网络能根据用户的要求分配和调度资源,对不同的数据流提供不同的服务质量:对实时性强且重要的数据报文优先处理;对于实时性不强的普通数据报文,提供较低的处理优先级,网络拥塞时甚至丢弃。QoS应运而生。支持QoS功能的设备,能够提供传输品质服务;针对某种类别的数据流,可以为它赋予某个级别的传输优先级,来标识它的相对重要性,并使用设备所提供的各种优先级转发策略、拥塞避免等机制为这些数据流提供特殊的传输服务。配置了QoS的网络环境,增加了网络性能的可预知性,并能够有效地分配网络带宽,更加合理地利用网络资源。
在RabbitMQ中,我们可以设置预先抓取值来实现不公平分发:
先设置抓取数量为 1
package com.soberw.work; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname consumer1 * @Description * @Date 2022-05-31 21:29 */ public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); //设置预先抓取消息的数量,消费完成抓取数量后的消息后再来继续抓取,注意此时必须改为手动确认 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body1:" + new String(body)); //添加休眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //在获取到消息后,可以进行一些操作,例如数据库操作、发送邮件、记录日志等... /* * 确认消息一般放在最后,等程序执行完无异常后,在进行确认消息 * * 第一个参数:添加手动确认消息的唯一标识 * 第二个参数:确认方式。是否一次抓取并确认多个消息 * true:设置为true,即表示可以批量确认消息,这样可以挺高执行速度,当消息量大时,避免繁琐的处理, * 但是队列会一次性将多个消息同时从队列中删除,也会导致系统的不稳定性,存在安全隐患 * false:设置为false,即表示无论多少个消息,都需要一次一次的确认,此设置会可能影响处理的性能 * 实际中可根据不同场景进行设置... */ channel.basicAck(envelope.getDeliveryTag(), true); //手动确认,业务逻辑没有问题后才确认 } }; //初始的自动确认 //channel.basicConsume(QUEUE_NAME, true, consumer); //改为手动确认 channel.basicConsume(QUEUE_NAME, false, consumer); } }
同消费者1,设置抓取数量为 1 。
为了直观的看到程序的流程,我们加上断点测试一下,加在确认消息的位置:
生产者发送10条消息,消费者1和2都预先抓取了一条消息,并执行到确认消息处暂停:
此时查看消息队列:
存在两条为确认的消息,分别对应着两个消费者抓取的消息。
此时先放行消费者1,即让消费者1确认消息:
发现队列中的消息减少了一个,观察控制台:
成功取出。
此时删除断点,让两个消费者自然执行:
我们发现,消费者1因为执行速度快,就得到了更多的消息。
上面我们通过设置预先抓取值,可以实现按劳分配的不公平分发效果,设置的值为1,如果设置为2呢?
依然断点启动,方便查看效果:
两个消费者各自抓取了两个。
此时放行消费者2:
我们发现,消费者2成功取出一个消息,队列中删除了一个,但是显示未确认的消息数量还是4个,这与其执行机制有关,我们先放行:
依然是消费者1执行的多。
抓取值机制:
- 我们给消费程序设置的预先抓取值,更像是一个分配阀值,当队列发现程序中设置的抓取值与实际数量不等时,如果队列中还有消息,就给其分配消息,直到与预先抓取值相同为止。
- 这也就是为什么队列中始终有4个未确认消息的原因
- 可以认为,队列分配消息的方式是渐进式的,而不是隔断式的
- 因此,在同等执行速度下,预先抓取值设置的越大,则抓取的消息越多
下面验证一下:
将消费者1设置为2
,消费者2设置为1
,将他们的休眠时间都设置为1
秒
对比上面两组测试,得出结论:
- 在同等执行效率的情况下,设置的预先抓取值越大,则抓取的消息越多
- 在设置同等预先抓取值的情况下,执行效率越快,则抓取的消息越多
订阅模式示例图:
前面2个案例中,只有3个角色:
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
发布订阅模式:
需要创建交换机以进行消息转发:
package com.soberw.fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.soberw.util.ConnectionUtil; /** * @author soberw * @Classname Publisher * @Description Publish/Subscribe发布与订阅模式的生产者应用程序 * @Date 2022-06-01 11:00 */ public class Publisher { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /* *exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) *参数: * 1. exchange:交换机名称 * 2. type:交换机类型 * DIRECT("direct"),:定向 * FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定的队列。 * TOPIC("topic"),通配符的方式,发送给符合通配符条件的队列 * HEADERS("headers");参数匹配 * 3. durable:是否持久化 * 4. autoDelete:自动删除 * 5. internal:内部使用。 一般false * 6. arguments:参数 */ String exchangeName = "test_fanout"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //6. 创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); //7. 绑定队列和交换机 /* *queueBind(String queue, String exchange, String routingKey) *参数: * 1. queue:队列名称 * 2. exchange:交换机名称 * 3. routingKey:路由键,绑定规则 * 如果交换机的类型为fanout ,routingKey设置为"" */ channel.queueBind(queue1Name, exchangeName, ""); channel.queueBind(queue2Name, exchangeName, ""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName, "", null, body.getBytes()); //9. 释放资源 channel.close(); connection.close(); } }
package com.soberw.fanout; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname Consumer * @Description Publish/Subscribe发布与订阅模式的消费者应用程序 * @Date 2022-06-01 11:00 */ public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name, true, consumer); } }
将队列名称改为2号队列,其他一样。
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
发现,两个消费者都成功取到了消息,并且情空了各自的队列:
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。、
发布订阅模式与工作队列模式的区别
- 1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。
路由模式特点:
图解:
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct
,还有队列绑定交换机的时候需要指定routing key
。
package com.soberw.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.soberw.util.ConnectionUtil; /** * @author soberw * @Classname Publisher * @Description Routing路由模式的生产者应用程序 * @Date 2022-06-01 12:55 */ public class Publisher { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //定义交换机名字 String exchangeName = "test_direct"; // 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); // 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; // 声明(创建)队列 channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); // 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name, exchangeName, "error"); // 队列2绑定info warning channel.queueBind(queue2Name, exchangeName, "info"); channel.queueBind(queue2Name, exchangeName, "warning"); String errorMessage = "日志信息:张三调用了delete方法.错误了,日志级别error..."; String warningMessage = "日志信息:张三调用了delete方法.错误了,日志级别warning..."; String infoMessage = "日志信息:张三调用了delete方法.错误了,日志级别info..."; // 发送消息 channel.basicPublish(exchangeName, "error", null, errorMessage.getBytes()); channel.basicPublish(exchangeName, "warning", null, warningMessage.getBytes()); channel.basicPublish(exchangeName, "info", null, infoMessage.getBytes()); channel.close(); connection.close(); } }
package com.soberw.routing; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname Consumer1 * @Description Routing路由模式的消费者应用程序 * @Date 2022-06-01 12:55 */ public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name, true, consumer); } }
消费者2从队列2中取消息。
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
Topic主题模式也叫通配符模式。Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”
分割,例如: item.insert
通配符规则:
#:匹配零个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
图解:
使用topic类型的Exchange,发送不同消息的routing key。
package com.soberw.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.soberw.util.ConnectionUtil; /** * @author soberw * @Classname Publisher * @Description Topic通配符模式的生产者程序 * @Date 2022-06-01 14:51 */ public class Publisher { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); // 绑定队列和交换机 /* * 参数: * 1. queue:队列名称 * 2. exchange:交换机名称 * 3. routingKey:路由键,绑定规则 * 如果交换机的类型为fanout ,routingKey设置为"" */ // routing key 系统的名称.日志的级别。 //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name, exchangeName, "#.error"); //匹配零个或者多个以.error结束的词 channel.queueBind(queue1Name, exchangeName, "order.*"); //匹配一个以order.开头的词 channel.queueBind(queue2Name, exchangeName, "*.*"); //匹配两个以 . 分割的词 //定义不同的routing key String key1 = "order.info.error"; //可被队列 1 匹配到 String key2 = "order.info"; //可被队列 1 2 匹配到 String key3 = "goods.info"; //可被队列 2 匹配到 String key4 = "goods.error"; //可被队列 1 2 匹配到 String body1 = "日志信息:张三通过" + key1 + "调用了方法..."; String body2 = "日志信息:张三通过" + key2 + "调用了方法..."; String body3 = "日志信息:张三通过" + key3 + "调用了方法..."; String body4 = "日志信息:张三通过" + key4 + "调用了方法..."; //发送消息 channel.basicPublish(exchangeName, key1, null, body1.getBytes()); channel.basicPublish(exchangeName, key2, null, body2.getBytes()); channel.basicPublish(exchangeName, key3, null, body3.getBytes()); channel.basicPublish(exchangeName, key4, null, body4.getBytes()); channel.close(); connection.close(); } }
package com.soberw.topic; import com.rabbitmq.client.*; import com.soberw.util.ConnectionUtil; import java.io.IOException; /** * @author soberw * @Classname Consumer1 * @Description Topic通配符模式的消费者程序 * @Date 2022-06-01 14:50 */ public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); } }; channel.basicConsume(queue1Name, true, consumer); } }
同消费者1 一样,接收来自队列2 的消息。
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。