赞
踩
简单模式:生产者发布Publish消息到队列,消费者从队列消费Consume消息
生产者代码:
- package com.example.simple;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class SimpleProvider {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建链接工厂对象
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //设置RabbitMQ服务主机地址,默认localhost
- connectionFactory.setHost("localhost");
- //设置端口,默认5672
- connectionFactory.setPort(5672);
- //设置虚拟主机名,默认/
- connectionFactory.setVirtualHost("/mytest");
- //设置用户名密码
- connectionFactory.setUsername("test");
- connectionFactory.setPassword("123456");
-
- //创建链接
- Connection connection = connectionFactory.newConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明队列
- //参数1:消息队列名称
- //参数2,是否持久化
- //参数3,是否独占本次connection链接
- //参数4,是否自动删除消息
- //参数5,是否需要传递额外的队列数据Map
- channel.queueDeclare("simple_queue1",true,false,false,null);
- //创建消息
- String msg = "hello";
- //发布消息
- //参数1,使用指定的交换机,不设置则使用默认交换机
- //参数2,指定路由器key,如果是简单模式直接给队列名就行
- //参数3,发生消息时是否需要额外的消息数据
- //参数4,消息内容
- channel.basicPublish("","simple_queue1",null,msg.getBytes());
- //关闭资源
- channel.close();
- connection.close();
- }
- }

消费者代码
- package com.example.simple;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class SimpleConsumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建链接工厂对象
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //设置RabbitMQ服务主机地址,默认localhost
- connectionFactory.setHost("localhost");
- //设置端口,默认5672
- connectionFactory.setPort(5672);
- //设置虚拟主机名,默认/
- connectionFactory.setVirtualHost("/mytest");
- //设置用户名密码
- connectionFactory.setUsername("test");
- connectionFactory.setPassword("123456");
-
- //创建链接
- Connection connection = connectionFactory.newConnection();
- //创建频道
- Channel channel = connection.createChannel();
-
- //声明队列
- //参数1:消息队列名称
- //参数2,是否持久化
- //参数3,是否独占本次connection链接
- //参数4,是否自动删除消息
- //参数5,是否需要传递额外的队列数据Map
- channel.queueDeclare("simple_queue1",true,false,false,null);
- //创建消费者,并且设置消息处理
- //指定频道
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
- /**
- *
- * @param consumerTag 消费者名称,如果之前没有指定,则会自动生成一个
- * @param envelope 消息一些额外的参数 channel.basicPublish("","simple_queue1",null,msg.getBytes());
- * @param properties 另外的数据所封装的属性对象 null
- * @param body 消息本身
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //实现消费的业务逻辑
- System.out.println("consumerTag"+consumerTag);
- System.out.println("交换机:"+envelope.getExchange()+"----RoutingKey:"+envelope.getRoutingKey()+"----DeliveryTag:"+envelope.getDeliveryTag());
- System.out.println("消息本身:"+new String(body,"UTF-8"));
- }
- };
- //监听消息
- //参数1,指定要监听的队列的名称
- //参数2,设置消息的应答模式,true自动应答(性能好,消息有可能丢失),false手动应答
- //参数3,
- channel.basicConsume("simple_queue1",true,defaultConsumer);
-
-
- //关闭资源,建议不关闭一直监听
- }
- }

工作模式:多个消费者消费多个不同的消息,创建多个消费者即可
订阅-广播模式:同一个消息被多个消费者消费。上面两个模式只有三个角色分别是生产者,队列,消费者。而在订阅模式中多了一个角色,那就是exchange交换机角色。
订阅模式大概过程是:生产者生产消息后,发送给交换机,然后由交换机转发给各个队列,在这之前队列需要和交换机进行绑定。 .
生产者代码
- package com.example.fanout;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class FanoutProvider {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建链接工厂对象
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //设置RabbitMQ服务主机地址,默认localhost
- connectionFactory.setHost("localhost");
- //设置端口,默认5672
- connectionFactory.setPort(5672);
- //设置虚拟主机名,默认/
- connectionFactory.setVirtualHost("/mytest");
- //设置用户名密码
- connectionFactory.setUsername("test");
- connectionFactory.setPassword("123456");
-
- //创建链接
- Connection connection = connectionFactory.newConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明队列
- //参数1:消息队列名称
- //参数2,是否持久化
- //参数3,是否独占本次connection链接
- //参数4,是否自动删除消息
- //参数5,是否需要传递额外的队列数据Map
- channel.queueDeclare("fanout_queue1",true,false,false,null);
- //声明两个队列
- channel.queueDeclare("fanout_queue2",true,false,false,null);
-
- //声明交换机
- //参数1 交换机名称
- //参数2,指定交换机的类型 BuiltinExchangeType.FANOUT(广播)
- channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
- //将队列绑定到指定的交换机
- //参数1,指定要绑定的队列
- //参数2,指定绑定的交换机
- //参数3,指定routingkey,广播模式可以不写
- channel.queueBind("fanout_queue1","fanout_exchange","");
- channel.queueBind("fanout_queue2","fanout_exchange","");
-
- //创建消息
- String msg = "hello";
- //发布消息
- //参数1,使用指定的交换机,不设置则使用默认交换机
- //参数2,指定路由器key,如果是简单模式直接给队列名就行,广播模式指定为空
- //参数3,发生消息时是否需要额外的消息数据
- //参数4,消息内容
- channel.basicPublish("fanout_exchange","",null,msg.getBytes());
- //关闭资源
- channel.close();
- connection.close();
- }
- }

消费者1
- package com.example.fanout;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class FanoutConsumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建链接工厂对象
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //设置RabbitMQ服务主机地址,默认localhost
- connectionFactory.setHost("localhost");
- //设置端口,默认5672
- connectionFactory.setPort(5672);
- //设置虚拟主机名,默认/
- connectionFactory.setVirtualHost("/mytest");
- //设置用户名密码
- connectionFactory.setUsername("test");
- connectionFactory.setPassword("123456");
-
- //创建链接
- Connection connection = connectionFactory.newConnection();
- //创建频道
- Channel channel = connection.createChannel();
-
- //声明队列
- //参数1:消息队列名称
- //参数2,是否持久化
- //参数3,是否独占本次connection链接
- //参数4,是否自动删除消息
- //参数5,是否需要传递额外的队列数据Map
- channel.queueDeclare("fanout_queue1",true,false,false,null);
- //创建消费者,并且设置消息处理
- //指定频道
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
- /**
- *
- * @param consumerTag 消费者名称,如果之前没有指定,则会自动生成一个
- * @param envelope 消息一些额外的参数 channel.basicPublish("","simple_queue1",null,msg.getBytes());
- * @param properties 另外的数据所封装的属性对象 null
- * @param body 消息本身
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //实现消费的业务逻辑
- System.out.println("consumerTag"+consumerTag);
- System.out.println("交换机:"+envelope.getExchange()+"----RoutingKey:"+envelope.getRoutingKey()+"----DeliveryTag:"+envelope.getDeliveryTag());
- System.out.println("消息本身:"+new String(body,"UTF-8"));
- }
- };
- //监听消息
- //参数1,指定要监听的队列的名称
- //参数2,设置消息的应答模式,true自动应答(性能好,消息有可能丢失),false手动应答
- //参数3,
- channel.basicConsume("fanout_queue1",true,defaultConsumer);
-
-
- //关闭资源,建议不关闭一直监听
- }
- }

消费者2
- package com.example.fanout;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class FanoutConsumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建链接工厂对象
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //设置RabbitMQ服务主机地址,默认localhost
- connectionFactory.setHost("localhost");
- //设置端口,默认5672
- connectionFactory.setPort(5672);
- //设置虚拟主机名,默认/
- connectionFactory.setVirtualHost("/mytest");
- //设置用户名密码
- connectionFactory.setUsername("test");
- connectionFactory.setPassword("123456");
-
- //创建链接
- Connection connection = connectionFactory.newConnection();
- //创建频道
- Channel channel = connection.createChannel();
-
- //声明队列
- //参数1:消息队列名称
- //参数2,是否持久化
- //参数3,是否独占本次connection链接
- //参数4,是否自动删除消息
- //参数5,是否需要传递额外的队列数据Map
- channel.queueDeclare("fanout_queue2",true,false,false,null);
- //创建消费者,并且设置消息处理
- //指定频道
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
- /**
- *
- * @param consumerTag 消费者名称,如果之前没有指定,则会自动生成一个
- * @param envelope 消息一些额外的参数 channel.basicPublish("","simple_queue1",null,msg.getBytes());
- * @param properties 另外的数据所封装的属性对象 null
- * @param body 消息本身
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //实现消费的业务逻辑
- System.out.println("consumerTag"+consumerTag);
- System.out.println("交换机:"+envelope.getExchange()+"----RoutingKey:"+envelope.getRoutingKey()+"----DeliveryTag:"+envelope.getDeliveryTag());
- System.out.println("消息本身:"+new String(body,"UTF-8"));
- }
- };
- //监听消息
- //参数1,指定要监听的队列的名称
- //参数2,设置消息的应答模式,true自动应答(性能好,消息有可能丢失),false手动应答
- //参数3,
- channel.basicConsume("fanout_queue2",true,defaultConsumer);
-
-
- //关闭资源,建议不关闭一直监听
- }
- }

订阅-Routing路由模式:
生产者代码
- package com.example.routingkey;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.TimeoutException;
-
- public class RoutingKeyProvider {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建链接工厂对象
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //设置RabbitMQ服务主机地址,默认localhost
- connectionFactory.setHost("localhost");
- //设置端口,默认5672
- connectionFactory.setPort(5672);
- //设置虚拟主机名,默认/
- connectionFactory.setVirtualHost("/mytest");
- //设置用户名密码
- connectionFactory.setUsername("test");
- connectionFactory.setPassword("123456");
-
- //创建链接
- Connection connection = connectionFactory.newConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明队列
- //参数1:消息队列名称
- //参数2,是否持久化
- //参数3,是否独占本次connection链接
- //参数4,是否自动删除消息
- //参数5,是否需要传递额外的队列数据Map
- channel.queueDeclare("routingkey_queue1",true,false,false,null);
- //声明两个队列
- channel.queueDeclare("routingkey_queue2",true,false,false,null);
-
- //声明交换机
- //参数1 交换机名称
- //参数2,指定交换机的类型 BuiltinExchangeType.FANOUT(广播) BuiltinExchangeType.DIRECT(路由模式)
- channel.exchangeDeclare("routingkey_exchange", BuiltinExchangeType.DIRECT);
- //将队列绑定到指定的交换机
- //参数1,指定要绑定的队列
- //参数2,指定绑定的交换机
- //参数3,指定routingkey,广播模式可以不写, routingkey模式要写
- channel.queueBind("routingkey_queue1","routingkey_exchange","item.insert");
- channel.queueBind("routingkey_queue2","routingkey_exchange","item.delete");
-
- //创建消息
- String msg = "增加操作";
- //发布消息
- //参数1,使用指定的交换机,不设置则使用默认交换机
- //参数2,指定路由器key,如果是简单模式直接给队列名就行,广播模式指定为空
- //参数3,发生消息时是否需要额外的消息数据
- //参数4,消息内容
- channel.basicPublish("routingkey_exchange","item.insert",null,msg.getBytes(StandardCharsets.UTF_8));
- String msg2 = "删除操作";
- //发布消息
- //参数1,使用指定的交换机,不设置则使用默认交换机
- //参数2,指定路由器key,如果是简单模式直接给队列名就行,广播模式指定为空
- //参数3,发生消息时是否需要额外的消息数据
- //参数4,消息内容
- channel.basicPublish("routingkey_exchange","item.delete",null,msg2.getBytes(StandardCharsets.UTF_8));
- //关闭资源
- channel.close();
- connection.close();
- }
- }

消费者代码还跟上面一样,改一下对应的队列名就行
订阅-Topics通配符模式(主题模式)
RoutingKey一般是由一个或者多个单词组成,多个单词之间以.分割。例如:item.insert
通配符规则:
#:匹配一个或者多个词
*:匹配不多不少一个词
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。