赞
踩
https://www.rabbitmq.com/getstarted.html
第一种模型: HelloWorld简单消费,一个生产者和一个消费者
P:生产者,也就是要发送消息的程序;
C: 消费者,消息的接受者,会一直等待消息的到来。
queue: 也就是红色部分,消息队列,类似于一个邮箱,生产者可以投递消息,消费者可以取处消息;
<!--rabbitmq的相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
//创建连接mq的连接工厂对象,也就是连接server的对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/wang1");
//设置连接上面虚拟主机的用户名和密码
connectionFactory.setUsername("wang");
connectionFactory.setPassword("wang");
//获取通道连接虚拟主机的对象
Connection connection = connectionFactory.newConnection();
// Connection connection = RabbitMqUtil.getConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
//设置通道的属性,咱们要通过通道给队列送消息
//绑定队列,如果咱们没在页面创建队列,那么也可以在这里使用代码自动创建,注意,一个通道可以绑定多个队列
// 参数1 : 队列名,如果队列不存在则自动创建
// 参数2: 用来定义队列的特性是否持久化, true持久化队列 false 不持久化队列,
// 注意:仅仅队列持久化,不保证该队列里的消息持久化
// 参数3: exclusive 是否独占队列 也就是该队列只允许当前连接Channel使用,
// 如果其他的连接或者通道来使用会报错, true独占队列 false不独占队列
// 参数4: autoDelete:是否在消费完成后自动删除队列 true自动删除,false不自动删除
//当设置为true时,消费者消费完消息且关闭通道连接后,队列会自动删除,如果消费完队列不关闭连接会导致队列被占用,不会自动删除
//参数5 : 额外的参数
channel.queueDeclare("wangChannel", true, false, false, null);
//真正发布消息
//参数1: 交换机名称,因为简单直连模式没用交换机,所以未空
//参数2: 队列名称
// 参数3:传递消息额外设置,设置为null,当rabbitmq重启时,未消费的消息会丢失,
//当设置为MessageProperties.PERSISTENT_TEXT_PLAIN,消息会持久化,即使rabbitmq重启也不影响
// 参数4: 消息体
//注意: 消息发布时通道绑定的队列属性必须和消息消费时通道绑定的队列属性一致,
// 不可能说发消息时队列是持久化的,而消费时不是持久化的,那样会报错
channel.basicPublish("", "wangChannel", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello,rabbitmq".getBytes());
//关闭通道
channel.close();
connection.close();
// RabbitMqUtil.closeConnectAndChannel(channel,connection);
//先创建连接server的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的端口
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//连接哪个虚拟主机
connectionFactory.setVirtualHost("/wang1");
connectionFactory.setUsername("wang");
connectionFactory.setPassword("wang");
//创建通道连接虚拟主机的对象
Connection connection = connectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定队列
channel.queueDeclare("wangChannel", false, false, false, null);
//消费消息 参数1:队列名称 参数2:是否自动确认 参数 接口回调执行
channel.basicConsume("wangChannel",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费消息:::::=> "+new String(body));
}
});
//如果不关闭,消费者会一直监听该队列
// channel.close();
// connection.close();
public class RabbitMqUtil {
private static final ConnectionFactory connectionFactory;
//rabbitmq安装的主机IP
private static final String Host = "127.0.0.1";
//rabbitmq访问端口
private static final int Port = 5672;
//虚拟主机名称
private static final String VirtualHost = "/wang1";
//用户名
private static final String UserName = "wang";
//密码
private static final String Password = "wang";
static {
//先创建连接server的连接
connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的IP,端口
connectionFactory.setHost(Host);
connectionFactory.setPort(Port);
//连接哪个虚拟主机
connectionFactory.setVirtualHost(VirtualHost);
//连接虚拟主机的用户名,密码
connectionFactory.setUsername(UserName);
connectionFactory.setPassword(Password);
}
public static Connection getConnection() {
try {
//创建通道连接虚拟主机的对象
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
//关闭通道和连接工具方法
public static void closeConnectAndChannel(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
第二种模型:工作模型/任务模型
Work queues 工作队列也称为任务队列(Task queues),任务模型,当消息处理比较耗时时,可能生产消息的速度远远超过消息的消费速度,长期以往会导致消息堆积太多,无法及时处理,此时就需要用到work queues
队列中消息一旦被消费,就会消失,因为任务是不会被重复执行的。
//这里为了快捷我们就使用之前创建的工具类
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通道绑定队列
channel.queueDeclare("wangRabbitMqWork", true, false, false, null);
//发布消息,这里模拟一次性发布多条消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("","wangRabbitMqWork", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+"我是工作模型队列").getBytes());
}
RabbitMqUtil.closeConnectAndChannel(channel, connection);
//这里为了快捷我们就使用之前创建的工具类
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通道绑定队列
channel.queueDeclare("wangRabbitMqWork", true, false, false, null);
//消费消息
channel.basicConsume("wangRabbitMqWork", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号 = " + new String(body));
}
});
//一般不关闭,让消费者监听队列消费消息
//这里为了快捷我们就使用之前创建的工具类
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通道绑定队列
channel.queueDeclare("wangRabbitMqWork", true, false, false, null);
//消费消息 参数1:队列名称 参数2:是否自动确认 参数 接口回调执行
channel.basicConsume("wangRabbitMqWork", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号 = " + new String(body));
}
});
//一般不关闭,让消费者监听队列消费消息
生产者生产消息,消费者1和消费者2是平均消费
那么问题来了: 循环平均分配,如果消费者2处理特别慢,是不是很影响性能
Rabbitmq的循环平均分配取决于它的消息确认机制,队列一次性把多个消息平均分配给各个消费者
消费者拿到消息就立马自动确认,而不管后续是否正常消费,这样会导致消息丢失问题,所以为了实现能者多劳模式,
我们采用手动确认机制,且每次只从消息队列取一条,这样确保消息不丢失.
修改消费者为能者多劳,手动确认消息
//这里为了快捷我们就使用之前创建的工具类
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通道绑定队列
channel.queueDeclare("wangRabbitMqWork", true, false, false, null);
//每次只取一条,消费完再取,确保消息不丢失,安全
channel.basicQos(1);
//消费消息 参数2 关闭消息自动确认,因为当它为true时,不管下面的代码是否正常执行,
// 它就向消息队列自动确认,这时队列会删除消息,这就会产生消息丢失问题
channel.basicConsume("wangRabbitMqWork", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号 = " + new String(body));
//参数1: 确认消息队列中具体哪个消息确认,相当于一个标识
//参数2: 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
//一般不关闭,让消费者监听队列消费消息
第三种:Fanout广播模型 fanout:扇出,也成为广播
1.可以有多个消费者
2.每个消费者都有自己的队列
3.每个队列都要绑定到Exchange(交换机) 注:其实我们之前的HelloWord或者work模型都存在交换机,虽然我们写了“”,但是是存在的,只不过是rabbitmq默认的
4.生产者发送的消息,只能发送到交换机,交换机来决定要发到哪个队列,生产者无法决定。
5.交换机把消息发给绑定的所有队列。
6.队列的消费者都能拿到消息。实现一条消息被多个消费者消费
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//将通道绑定交换机 //参数1: 交换机名称 参数2: 交换机类型: fanout 广播类型
channel.exchangeDeclare("wangExChange", "fanout");
//真正发布消息
//参数1: 交换机名称,因为简单直连模式没用交换机,所以未空
//参数2: 队列名称
// 参数3:传递消息额外设置,设置为null,当rabbitmq重启时,未消费的消息会丢失,
//当设置为MessageProperties.PERSISTENT_TEXT_PLAIN,消息会持久化,即使rabbitmq重启也不影响
// 参数4: 消息体
//注意: 消息发布时通道绑定的队列属性必须和消息消费时通道绑定的队列属性一致,
// 不可能说发消息时队列是持久化的,而消费时不是持久化的,那样会报错
channel.basicPublish("wangExChange","",null,"此时是广播模式".getBytes());
//释放资源
RabbitMqUtil.closeConnectAndChannel(channel, connection);
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("wangExChange", "fanout");
//获取临时队列的名字
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列 参数1: 临时队列名称 参数2:交换机名称 参数3 :路由 这里暂时用不到
channel.queueBind(queueName, "wangExChange", "");
//消费消息 参数1:队列名称 参数2:是否自动确认
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是广播模型消费者1号" + new String(body));
}
});
问题: 在Fanout模式中,一条消息,会被所有的订阅的队列所消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送在Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断。只有队列的RoutingKey与消息的Routing Key完全一致,才会接收到消息流程。
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定交换机 参数1:交换机名称 参数2: direct 路由模式
channel.exchangeDeclare("logs_direct", "direct");
//发送消息
String routingKey = "info";
channel.basicPublish("logs_direct", routingKey, null, "现在是路由模式".getBytes());
//关闭消息
RabbitMqUtil.closeConnectAndChannel(channel, connection);
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct", "direct");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//基于routekey绑定队列和交换机
channel.queueBind(queueName, "logs_direct", "info");
//获取消费的消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是路由模式消费者2号"+new String(body));
}
});
Topics模式,也称为动态路由模式,
其实就是在第四种路由模型Route的基础只是,将原先写死的路由key改成动态匹配
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列,
只不过Topic类型Exchange可以让队列在绑定Routing Key的时候使用通配符!
这种模型RoutingKey一般都是由一个或多个单词组成,多个单词之间以"."分割,例如 inem.user
* (star) can substitute for exactly one word. 匹配不多不少恰好一个词
# (hash) can substitute for zero or more words. 匹配0或更多个词
举个列子:
item.* 能匹配item.save 但是不能匹配item或者item.save.save,有且仅有一个
item.# 能匹配item/item.save/item.save.save 说白了只要是item开头都可以
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机以及交换机类型 topic
channel.exchangeDeclare("wangTopics", "topic");
//发布消息
//声明RoutingKey
String routeKey = "user.save";
channel.basicPublish("wangTopics",routeKey,null,"这里是动态路由交换机Topic".getBytes());
//关闭资源
RabbitMqUtil.closeConnectAndChannel(channel, connection);
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机类型
channel.exchangeDeclare("wangTopics", "topic");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,动态通配符形式route key
channel.queueBind(queueName, "wangTopics", "user.*");
//消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("动态路由消费者消费消息" + new String(body));
}
});
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。