赞
踩
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
同时去掉测试依赖的<scope>test</scope>
,不然一会不能进行单元测试
在上图的模型中,有以下概念:
前提:
public class Producer { @Test public void pro() throws IOException, TimeoutException { //创建连接工厂 //创建连接mq的连接工厂对象 ConnectionFactory factory = new ConnectionFactory(); //设置连接rabbitmq主机 factory.setHost("192.168.77.138"); //设置端口号 factory.setPort(5672); //设置访问虚拟主机的用户名和密码 factory.setUsername("ems"); factory.setPassword("123"); //设置连接那个虚拟主机 factory.setVirtualHost("/ems"); //获取连接对象 Connection connection = factory.newConnection(); //获取连接中通道 Channel channel = connection.createChannel(); //通道绑定对应消息队列 //参数1: 队列名称 如果队列不存在自动创建 //参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化 //参数3: exclusive 是否独占队列 true 独占队列 false 不独占 //参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 //参数5: 额外附加参数 channel.queueDeclare("hello", true, false, false, null); //发布消息 //参数1: 交换机名称 参数2:队列名称 参数3:传递息额外设置 参数4:消息的具体内容 channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitmq".getBytes()); //关闭连接 channel.close(); connection.close(); } }
生产消息到队列:
消费者端不能使用单元测试,需要一直监听
同时不关闭连接消息通道与连接
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.77.138"); factory.setPort(5672); factory.setVirtualHost("/ems"); factory.setUsername("ems"); factory.setPassword("123"); Connection connection = factory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //通道绑定队列:与生产端一致 channel.queueDeclare("hello", true, false, false, null); //获取消息 //参数1: 消费那个队列的消息 队列名称 //参数2: 开始消息的自动确认机制[只要消费就从队列删除消息] //参数3: 消费时的回调接口 channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("取出消息:===>" + new String(body)); } }); } }
注意:生产方队列的声明与消费方队列的声明要一致
channel.queueDeclare("hello",true,false,false,null);
# '参数1': 用来声明通道对应的队列 [ 如果不存在队列,会自动创建]
# '参数2': 用来指定是否持久化队列 [D: 指定后 rabbitmq 重启后不会消失 ]
# '参数3': 用来指定是否独占队列 [ 只允许当特定的连接]
# '参数4': 用来指定是否自动删除队列 [A:消息方消费完成后,队列自动删除]
# '参数5': 对队列的额外配置
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
public class Producer { @Test public void pro() throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare("work", true, false, false, null); //发送消息 for (int i = 1; i <= 20; i++) { channel.basicPublish("", "work", null, (i + "号消息").getBytes()); } //关闭连接 RabbitMQUtils.closeConnection(channel, connection); } }
public class Consumer1 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); //创建通道 final Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //处理消息比较慢 2秒处理一个消息 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者1号:==>消费" + new String(body)); } }); } }
public class Consumer2 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); //创建通道 final Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2号:==>消费" + new String(body)); } }); } }
总结: 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
如果想要实现消费快的多消费,消费慢的少消费,就需要对rabbitmq的消息确认机制进行配置
同时避免消息丢失
两点:
设置通道一次只能消费一个消息
关闭消息的自动确认,开启手动确认消息
消费方:
//步骤一:一次只接受一条未确认的消息
channel.basicQos(1);
//步骤二:[参数2:关闭自动确认消费]
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号:==>消费" + new String(body));
//消费完成后,手动确认消息 [ 参数1:确认标识,参数2:是否一次确认多条消息]
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
注意:消费方都需要进行手动确认,否则队列中不会确认删除
生产方:
//一次只发送一条消息
channel.basicQos(1);
测试:
消费者1号消费一条消息,2s后确认消息,消费者2号消费一条消息,手动确认,消息从队列删除,继续从队列取得一条进行消费,直到队列不存在消息
中文翻译:fanout 扇出 ,也称为广播
在广播模式下,消息发送流程是这样的:
广播 :一条消息多个消费者同时消费
public class Producer { @Test public void pro() throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 [ 参数一:交换机名字,参数2:交换机类型:fanout(广播模式) 固定] channel.exchangeDeclare("register", "fanout"); //发布消息[参数1:交换机名字,参数2:路由,参数3:消息持久化,参数4:消息内容] channel.basicPublish("register", "", null, "fanout...".getBytes()); //关闭 RabbitMQUtils.closeConnection(channel, connection); } }
public class Consumer1 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("register", "fanout"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //[绑定]=>临时队列和交换机 [参数1:临时队列,参数2:交换机,参数3:路由] channel.queueBind(queue, "register", ""); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号:==>" + new String(body)); } }); } }
public class Consumer2 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("register", "fanout"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //绑定临时队列和交换机 [参数1:临时队列,参数2:交换机,参数3:路由] channel.queueBind(queue, "register", ""); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2号:==>" + new String(body)); } }); } }
public class Consumer3 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("register", "fanout"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //绑定临时队列和交换机 [参数1:临时队列,参数2:交换机,参数3:路由] channel.queueBind(queue, "register", ""); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者3号:==>" + new String(body)); } }); } }
启动消费者,再启动生产者往交换机发送消息,三个消费者同时接收到消息
Routing 之订阅模型-Direct(直连)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange
在Direct模型中,流程如下:
图解:
public class Producer { @Test public void pro() throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机[参数1:交换机名字, 参数2:交换机类型,direct路由模式]=>基于指令的 Routing key 转发 channel.exchangeDeclare("logs_direct", "direct"); //发布的路由名称==>根据路由key的不同发送到不同的绑定队列中 String key = "error"; //发布消息[参数1:交换机名字,参数2:路由名字,参数3:消息内容] channel.basicPublish("logs_direct", key, null, ("发送给指定路由" + key + "的消息").getBytes()); //关闭连接 RabbitMQUtils.closeConnection(channel, connection); } }
public class Consumer1 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机[参数1:交换机名字, 参数2:交换机类型,direct路由模式] channel.exchangeDeclare("logs_direct", "direct"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //绑定临时队列与交换机并设置指定路由名称 channel.queueBind(queue, "logs_direct", "info"); channel.queueBind(queue, "logs_direct", "error"); channel.queueBind(queue, "logs_direct", "warn"); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号:==>" + new String(body)); } }); } }
public class Consumer2 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机[参数1:交换机名字, 参数2:交换机类型,direct路由模式] channel.exchangeDeclare("logs_direct", "direct"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //绑定临时队列与交换机并设置指定路由名称 channel.queueBind(queue, "logs_direct", "error"); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2号:==>" + new String(body)); } }); } }
Routing 之订阅模型-Topic
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列
只不过Topic
类型 Exchange 可以让队列在绑定Routing key
的时候使用通配符!
这种模型 Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 统配符
*(star) can substitute for exactly one word. 匹配不多不少恰好1个词
#(hash) can substitute for zero or more words. 匹配一个或多个词
# 如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs
public class Producer { @Test public void pro() throws IOException { //创建连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机[交换机名字+交换机类型] channel.exchangeDeclare("topics", "topic"); //发布消息==>使用动态路由(通配符方式) String key = "user.update"; //指定发布的路由key channel.basicPublish("topics", key, null, ("发送消息给指定的路由key" + key).getBytes()); //关闭连接 RabbitMQUtils.closeConnection(channel, connection); } }
Routing Key中使用*通配符方式
public class Consumer1 { public static void main(String[] args) throws IOException { //创建连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("topics", "topic"); //声明临时队列 String queue = channel.queueDeclare().getQueue(); //绑定临时队列与交换机并设置获取交换机中动态路由 String key = "user.*";//使用通配符指定路由key channel.queueBind(queue, "topics", key); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1号:===>" + new String(body)); } }); } }
Routing Key中使用*通配符方式
public class Consumer2 { public static void main(String[] args) throws IOException { //创建连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("topics", "topic"); //声明临时队列 String queue = channel.queueDeclare().getQueue(); //绑定临时队列与交换机 String key = "user.#";//使用通配符指定路由key channel.queueBind(queue, "topics", key); //消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2号:===>" + new String(body)); } }); } }
两个消费者都可以消费消息
仅消费者2号可以消费消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。