赞
踩
简介:
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
下面介绍常用的五大模式:
首先建工程引入相关的依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aaa</groupId> <artifactId>rabbit-parent</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <!--子模块--> <modules> <module>consumer</module> <module>product</module> </modules> <dependencies> <!--引入rabbitMQ的依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> </dependencies> </project>
工程图浏览:
consumer和product都是子项目。
从上图可以看到一共三个角色
p(product):代表的是生产者,用来生产消息的。
红色的中间部分(queue):代表的是一个队列,是用来存储消息和发送消息的。
c(consumer):代表的是消费者,用来消费消息的。
应用场景:
手机短信,邮件单发
生产者的代码:
package com.aaa.myf; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Product { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * String queue, 队列的名称 * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。 * boolean exclusive, 是否独占 false * boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除 * Map<String, Object> arguments 额外参数 先给null *//*"qy129_shuaifei",true,false,false,null*/ channel.queueDeclare("qy129_shuaifei",true,false,false,null); String msg="毛云飞不是最帅的么!!!!!!!"; /** * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认 * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称 * BasicProperties props,消息的一些额外配置,目前先不加null * byte[] body 消息的内容 */ channel.basicPublish("","qy129_shuaifei",null,msg.getBytes()); } }
消费者代码
package com.aaa.myf; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body接受的消息 System.out.println("消息的内容:"+new String(body)); } }; channel.basicConsume("qy129_shuaifei",true,defaultConsumer); } }
特点:
一个生产者
一个中间队列
多个消费者
是一对多的关系
消费者之间的关系是竞争关系
用处及应用场景:
比如批量处理上,rabbitmq里面挤压了大量的信息
抢红包
资源分配系统
生产者:
package com.aaa.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 工作者模式: * 特点:一个生产者 * 多个消费者 * 统一一个队列 * 这些消费者之间存在竞争关系 */ public class Product { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * String queue, 队列的名称 * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。 * boolean exclusive, 是否独占 false * boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除 * Map<String, Object> arguments 额外参数 先给null *//*"qy129_shuaifei",true,false,false,null*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ channel.queueDeclare("qy129_work",true,false,false,null); for (int i = 0; i < 10; i++) { String msg="今天天气不错,加油!!!!!!!"; /** * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认 * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称 * BasicProperties props,消息的一些额外配置,目前先不加null * byte[] body 消息的内容 */ channel.basicPublish("","qy129_work",null,msg.getBytes()); } //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态 channel.close(); connection.close(); } }
消费者1号:
package com.aaa.work; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer01 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //加上休眠时间是为了区分两个消费者的区别 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //body接受的消息 System.out.println("消息者01:"+new String(body)); } }; channel.basicConsume("qy129_work",true,defaultConsumer); } }
消费者2号
package com.aaa.work; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer02 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body接受的消息 System.out.println("消息者02:"+new String(body)); } }; channel.basicConsume("qy129_work",true,defaultConsumer); } }
特点:
一个生产者
交换机转发消息
多个个队列
多个消费者
应用场景:
消息推送,广告
生产者:
package com.aaa.fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 订阅者模式: * 一个生产者 * 多个消费者 * 多个订阅 * 交换机转发消息 */ public class Product { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * String queue, 队列的名称 * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。 * boolean exclusive, 是否独占 false * boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除 * Map<String, Object> arguments 额外参数 先给null *//*"qy129_shuaifei",true,false,false,null*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ channel.queueDeclare("qy129_fanout01",true,false,false,null); channel.queueDeclare("qy129_fanout02",true,false,false,null); /** * 创建交换机 * String exchange,交换机的名称 * BuiltinExchangeType type,交换机的类型 * boolean durable是否持久化 */ channel.exchangeDeclare("qy129_angermao", BuiltinExchangeType.FANOUT,true); /** * String queue,队列名 * String exchange,交换机的名称 * String routingKey;路由key,如果交换机为fanout模式则不需要路由key */ channel.queueBind("qy129_fanout01","qy129_angermao",""); channel.queueBind("qy129_fanout02","qy129_angermao",""); for (int i = 0; i < 10; i++) { String msg="今天天气不错,加油!!!!!!!"+i; /** * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认 * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称 * BasicProperties props,消息的一些额外配置,目前先不加null * byte[] body 消息的内容 */ channel.basicPublish("qy129_angermao","",null,msg.getBytes()); } //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态 channel.close(); connection.close(); } }
消费者01:
package com.aaa.fanout; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer01 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //加上休眠时间是为了区分两个消费者的区别 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //body接受的消息 System.out.println("消息者01:"+new String(body)); } }; channel.basicConsume("qy129_fanout01",true,defaultConsumer); } }
消费者02:
package com.aaa.fanout; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer02 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body接受的消息 System.out.println("消息者02:"+new String(body)); } }; channel.basicConsume("qy129_fanout02",true,defaultConsumer); } }
特点:
一个生产者
交换机 转发消息
有多个队列
routekey:路由key routekey匹配的消息可以达到对应的序列
应用场景:
短信,聊天工具,邮箱。。
手机号/邮箱地址,都可以是路由key
生产者:
package com.aaa.direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 路由模式: * 一个生产者 * 多个消费者 * 多个队列 * 交换机转发消息 * routekey:路由key只要routekey匹配的消息可以 */ public class Product { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * String queue, 队列的名称 * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。 * boolean exclusive, 是否独占 false * boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除 * Map<String, Object> arguments 额外参数 先给null *//*"qy129_shuaifei",true,false,false,null*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ channel.queueDeclare("qy129_direct01",true,false,false,null); channel.queueDeclare("qy129_direct02",true,false,false,null); /** * 创建交换机 * String exchange,交换机的名称 * BuiltinExchangeType type,交换机的类型 * boolean durable是否持久化 */ channel.exchangeDeclare("qy129_direct", BuiltinExchangeType.FANOUT,true); /** * String queue,队列名 * String exchange,交换机的名称 * String routingKey;路由key,如果交换机为fanout模式则不需要路由key */ channel.queueBind("qy129_direct01","qy129_direct","error"); channel.queueBind("qy129_direct02","qy129_direct","info"); channel.queueBind("qy129_direct02","qy129_direct","error"); channel.queueBind("qy129_direct02","qy129_direct","warning"); for (int i = 0; i < 10; i++) { String msg="今天天气不错,加油!!!!!!!"+i; /** * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认 * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称 * BasicProperties props,消息的一些额外配置,目前先不加null * byte[] body 消息的内容 */ channel.basicPublish("qy129_direct","error",null,msg.getBytes()); } //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态 channel.close(); connection.close(); } }
消费者01:
package com.aaa.direct; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer01 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //加上休眠时间是为了区分两个消费者的区别 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //body接受的消息 System.out.println("消息者01:"+new String(body)); } }; channel.basicConsume("qy129_direct01",true,defaultConsumer); } }
消费者02:
package com.aaa.direct; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer02 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body接受的消息 System.out.println("消息者02:"+new String(body)); } }; channel.basicConsume("qy129_direct02",true,defaultConsumer); } }
绑定按照统配符的模式:
*:匹配一个单词
#:统配n个字符
应用场景:
做物流分拣的多级传递.
生产者:
package com.aaa.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * topic主体模式: * 一个生产者 * 多个消费者 * 绑定按照通配符的模式 * *:统配一个单词 * #:统配n个单词 */ public class Product { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * String queue, 队列的名称 * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。 * boolean exclusive, 是否独占 false * boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除 * Map<String, Object> arguments 额外参数 先给null *//*"qy129_shuaifei",true,false,false,null*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/ channel.queueDeclare("qy129_topic01",true,false,false,null); channel.queueDeclare("qy129_topic02",true,false,false,null); /** * 创建交换机 * String exchange,交换机的名称 * BuiltinExchangeType type,交换机的类型 * boolean durable是否持久化 */ channel.exchangeDeclare("qy129_exchange_topic", BuiltinExchangeType.TOPIC,true); /** * String queue,队列名 * String exchange,交换机的名称 * String routingKey;路由key,如果交换机为fanout模式则不需要路由key */ channel.queueBind("qy129_topic01","qy129_exchange_topic","*.orange.*"); channel.queueBind("qy129_topic02","qy129_exchange_topic","*.*.rabbit"); channel.queueBind("qy129_topic02","qy129_exchange_topic","lazy.#"); for (int i = 0; i < 10; i++) { String msg="今天很生气,啥也不想说,明天加油,依旧光芒万丈!!!!!!!"+i; /** * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认 * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称 * BasicProperties props,消息的一些额外配置,目前先不加null * byte[] body 消息的内容 */ channel.basicPublish("qy129_exchange_topic","lazy.orange.rabbit",null,msg.getBytes()); } //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态 channel.close(); connection.close(); } }
消费者01:
package com.aaa.topic; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer01 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //加上休眠时间是为了区分两个消费者的区别 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //body接受的消息 System.out.println("消息者01:"+new String(body)); } }; channel.basicConsume("qy129_topic01",true,defaultConsumer); } }
消费者02:
package com.aaa.topic; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer02 { public static void main(String[] args) throws Exception{ //创建连接对象-配置连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.49.197"); //创建连接对象Connection Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //接受信息 /** * (String queue, 队列的名称 * boolean autoAck, 是否自动确认 * Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body接受的消息 System.out.println("消息者02:"+new String(body)); } }; channel.basicConsume("qy129_topic02",true,defaultConsumer); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。