赞
踩
MQ: Message Quene 消息队列
队列: 一种数据结构,先进先出
消息队列: 本质是个队列,只不过存放的是消息而已
消息中间件: 简单来说就是用来传输消息的中间载体
MQ的作用:
MQ的框架很多,比如ActiveMQ,RabbitMQ,RocketMQ,kafka,而我们使用的就是RabbitMQ
安装环境
yum install epel-release
yum install erlang
安装RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm
设置为开机启动
systemctl enable rabbitmq-server.service
查看服务状态
systemctl status rabbitmq-server.service
启动服务
systemctl start rabbitmq-server.service
停止服务
systemctl stop rabbitmq-server.service
重启服务
systemctl restart rabbitmq-server.service
查看当前所有的用户
rabbitmqctl list_users
查看guest用户所有拥有的权限
rabbitmqctl list_user_permissions guest
删除guest用户
rabbitmqctl delete_user guest
添加一个新的用户
rabbitmqctl add_user xbb 123456
给用户设置角色
rabbitmqctl set_user_tags xbb administrator
给用户赋予权限
rabbitmqctl set_permissions -p / xbb ".*" ".*" ".*"
开启web的管理界面
rabbitmq-plugins enable rabbitmq_management
P:生产者
C:消费者
红色代表队列
生产者将消息发送到队列,消费者从队列获取消息
1.导入依赖
<!--导入RabbitMQ的相关的包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.5.0</version>
</dependency>
2.获取MQ的连接
public class ConnectionUtils { /** * 获取连接 * @return */ public static Connection getConnection() throws IOException, TimeoutException { //申明连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接主机 connectionFactory.setHost("114.55.219.117"); //设置虚拟机 connectionFactory.setVirtualHost("/"); //设置访问的用户名 connectionFactory.setUsername("xbb"); //设置密码 connectionFactory.setPassword("123456"); //设置请求的端口 connectionFactory.setPort(5672); return connectionFactory.newConnection(); } }
3.生产者发送消息到队列
public class Producer { private static final String QUENE_NAME="helloworld"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建数据传输的通道 Channel channel = connection.createChannel(); //申明队列 /** * 第一个参数:队列的名字 * 第二个参数:是否持久化,发送到队列的消息,如果没有持久化重启会丢失 * 第三个参数:是否排外 * 1:连接关闭之后,这个队列是否自动删除 * 2:是否允许其他通道来进行访问 * 第四个参数:是否允许自动删除 * 第五个参数:申明队列的时候附带的一些参数 */ channel.queueDeclare(QUENE_NAME,false,false,false,null); //发送数据到队列 /** * 第一个参数:exchange 交换机 * 第二个参数:路由的key,没有就使用队列的名字 * 第三个参数:发送数据到队列时携带的参数 * 第四个参数:向队列发送的数据 */ channel.basicPublish("",QUENE_NAME,null,"helloworld".getBytes()); //释放资源 channel.close(); connection.close(); } }
4.消费者从队列获取消息
public class Consumer { private static final String QUEUE_NAME="helloworld"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者的申明 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消息的唯一标识 * @param envelope 请求消息属性的封装 * @param properties 前面队列携带过来的值 * @param body 接受到的消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受到的消息是:"+new String(body)); //进行手动应答 /** * 第一个参数:自动应答 * 第二个参数:false表示收到消息了 */ channel.basicAck(envelope.getDeliveryTag(),false); } }; //绑定消费者 /** * 第一个参数:队列名字 * 第二个参数:是否自动应答 * 第三个参数:消费者 */ channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
多个消费者消费的数据之和才是原来队列中的所有数据 适用于流量的消峰
消费者1:
public class Consumer1 { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者的申明 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消息的唯一标识 * @param envelope 请求消息属性的封装 * @param properties 前面队列携带过来的值 * @param body 接受到的消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1接受到的消息是:"+new String(body)); //进行手动应答 /** * 第一个参数:自动应答的标记 * 第二个参数:false表示收到消息了 */ channel.basicAck(envelope.getDeliveryTag(),false); } }; //绑定消费者 /** * 第一个参数:队列名字 * 第二个参数:是否自动应答 * 第三个参数:消费者 */ channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
消费者2:
public class Consumer2 { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者的申明 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消息的唯一标识 * @param envelope 请求消息属性的封装 * @param properties 前面队列携带过来的值 * @param body 接受到的消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接受到的消息是:"+new String(body)); //进行手动应答 /** * 第一个参数:自动应答 * 第二个参数:false表示收到消息了 */ channel.basicAck(envelope.getDeliveryTag(),false); } }; //绑定消费者 /** * 第一个参数:队列名字 * 第二个参数:是否自动应答 * 第三个参数:消费者 */ channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
生产者:
public class Producer { private static final String QUEUE_NAME="work"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发送消息 for (int i=0;i<100;i++){ channel.basicPublish("",QUEUE_NAME,null,(""+i).getBytes()); } channel.close(); connection.close(); } }
生产者没有将消息发送到队列而是发送到交换机,每个消费者都有自己的队列,每个队列都要绑定到交换机,消费者获取到生产者发送的信息是完整的
消费者1:
public class Consumer1 { private static final String QUEUE_NAME="work1"; private static final String EXCHANGE_NAME="fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); /**将队列绑定到交换机 * 第一个参数:队列的名字 * 第二个参数:交换机的名字 * 第三个参数:路由的key */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1接受到的消息是:"+new String(body)); //进行手动应答 channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
消费者2:
public class Consumer2 { private static final String QUEUE_NAME="work2"; private static final String EXCHANGE_NAME="fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); /**将队列绑定到交换机 * 第一个参数:队列的名字 * 第二个参数:交换机的名字 * 第三个参数:路由的key */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接受到的消息是:"+new String(body)); //进行手动应答 channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
生产者:
public class publish { //交换机的名字 private static final String EXCHANGE_NAME="fanout"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //申明交换机 /** * 第一个参数:交换机的名字 * 第二个参数:交换机的类型 * 如果使用的是发布订阅模式:只能写fanout */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //发送消息到交换机 for (int i = 0; i<100;i++){ channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模式的值"+i).getBytes()); } channel.close(); connection.close(); } }
生产者:
public class Producer { private static final String EXCHANGE_NAME="test_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //如果是路由模型,第二个参数必须为direct channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //发送消息到交换机 for (int i = 0; i < 100; i++) { if(i%2==0){ //将偶数项的消息发送到路由为hello的队列 channel.basicPublish(EXCHANGE_NAME,"hello",null,(EXCHANGE_NAME+i).getBytes()); }else { //将奇数项的消息发送到路由为world的队列 channel.basicPublish(EXCHANGE_NAME,"world",null,(EXCHANGE_NAME+i).getBytes()); } } channel.close(); connection.close(); } }
消费者1:
public class Consumer1 { private static final String QUEUE_NAME="direct_queue_1"; private static final String EXCHANGE_NAME="test_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //绑定队列到交换机 //第二个参数为路由的key channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"hello"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由hello的队列接受的消息为"+new String(body)); } }; //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
消费者2:
public class Consumer2 { private static final String QUEUE_NAME="direct_queue_2"; private static final String EXCHANGE_NAME="test_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //绑定队列到交换机 //第二个参数为路由的key channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"world"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由world的队列接受的消息为"+new String(body)); } }; //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
topic模式相当于是对 路由模式的一个升级 topic模式主要就是在匹配的规则上可以实现模糊匹配
*:只能匹配一个单词 如:user.* user.username
#:可以匹配一个到多个单词 如:user.# user.username.xbb
生产者:
public class Producer { private static final String QUEUE_NAME="topic_queue1"; private static final String EXCHANE_NAME="test_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //交换机 channel.exchangeDeclare(EXCHANE_NAME,"topic"); for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANE_NAME,"hello.world",null,("topic模型"+i).getBytes()); } channel.close(); connection.close(); } }
消费者1:
public class Consumer1 { private static final String QUEUE_NAME="topic_hello"; private static final String EXCHANGE_NAME="test_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"hello.*"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由hello接受的数据:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
消费者2:
public class Consumer2 { private static final String QUEUE_NAME="topic_world"; private static final String EXCHANGE_NAME="test_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.world"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由world接受的数据:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
放到队列中的消息,怎么保证一定成功的放入队列了呢?
confirm机制:只要消息放入队列成功,那么队列就一定会给反馈
生产者:
public class Producer { private static final String QUEUE_NAME="test_confirm"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //开启confirm消息机制 channel.confirmSelect(); //对消息实施监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long l, boolean b) throws IOException { System.out.println("消息发送成功的监听"); } @Override public void handleNack(long l, boolean b) throws IOException { System.out.println("消息发送失败的监听"); } }); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicPublish("",QUEUE_NAME,null,"hello world".getBytes()); } }
生产者发送消息的时候,如果交换机不存在,或者路由的key不存在,这时候就需要监听这种到达不了的消息。
注意:当前的队列中必须要有消费者存在
生产者:
public class Producer { private static final String EXCHANGE_NAME="test_return"; //正确的路由的key private static final String ROUTING_KEY="return.hello"; //错误的路由的key private static final String ROUTING_ERROR_KEY="world"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.addReturnListener(new ReturnListener() { /** * * @param i 队列相应给浏览器的状态码 * @param s 状态码对于的文本信息 * @param s1 交换机的名字 * @param s2 路由的key * @param basicProperties 消息的属性 * @param bytes 消息体的内容 * @throws IOException */ @Override public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { System.out.println("监听到不可达的消息"); System.out.println("状态码"+i); System.out.println("文本信息"+s); System.out.println("交换机名字"+s1); System.out.println("路由的key"+s2); } }); //这里的第三个参数如果设置为true,表示要监听不可达的消息进行处理 //如果设置为false,那么队列会直接删除这个消息 channel.basicPublish(EXCHANGE_NAME,ROUTING_ERROR_KEY,true,null,"厕所return机制".getBytes()); } }
消费者:
public class Consumer { //交换机的名字 private static final String EXCHANE_NAME="test_return"; //队列的名字 private static final String QUEUE_NAME="queue_return"; //路由的key private static final String ROUTING_NAME="*.hello"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //申明交换机 channel.exchangeDeclare(EXCHANE_NAME,"topic"); //绑定 channel.queueBind(QUEUE_NAME,EXCHANE_NAME,ROUTING_NAME); //申明消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息了"); } }; //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
假设消费者挂了,消息全部堆积到队列里面,然后当消费者重新启动时,队列里的消息就全部发送过来,但是客户端没办法同时去处理那么多的消息
这种场景下就需要对消费者进行限流
生产者:
public class Producer { private static final String QUEUE_NAME="test_limit"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); for (int i = 0; i <100 ; i++) { channel.basicPublish("",QUEUE_NAME,null,(i+"元钱").getBytes()); } channel.close(); connection.close(); } }
限流的消费者:
public class Consumer1 { private static final String QUEUE_NAME="test_limit"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); /** * 开启限流 * 第一个参数:消息本身的大小,如果设置为0,那么表示对消息大小不限制 * 第二个参数:一次性推送消息的最大数量,前提消息必须手动应答完成 * 第三个参数:true:将设置应用到通道 false:只是当前消费者的策略 */ channel.basicQos(0,5,false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者收到了"+new String(body)+"元钱"); try { Thread.sleep(100); } catch (Exception e){ } //进行手动应答 channel.basicAck(envelope.getDeliveryTag(),false); } }; //使用限流必须手动应答 channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
消费者2:
public class Consumer2 { private static final String QUEUE_NAME="test_limit"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者收到了"+new String(body)+"元钱"); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
给队列中的消息添加时间限制,如果超时这个消息会被删除
生产者:
public class Producer { private static final String QUEUE_NAME="test_ttl"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //ttl队列 Map<String,Object> map = new HashMap<>(); //设置5秒的过期时间 map.put("x-message-ttl",5000); channel.queueDeclare(QUEUE_NAME,false,false,false,map); for (int i = 0; i <100 ; i++) { channel.basicPublish("",QUEUE_NAME,null,(i+"元钱").getBytes()); } channel.close(); connection.close(); } }
什么是死信队列
当消息在队列中变成死信之后、可以定义它重新push 到另外一个交换机上、这个交换机 也有自己对应的队列 这个队列就称为死信队列
满足死信的条件:
当这个队列中如果有这个死信的时候、rabbitmq就会将这个消息自动发送到我们提前定义好的死信队列中去
生产者:
public class Producer {
private static final String EXCHANGE_NAME="ttl-dlx-exchange";
private static final String ROUTING_KEY="dlx.#";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,false,null,"nihaoa".getBytes());
}
}
消费者:
public class Consumer { //交换机名字 private static final String EXCHANGE_NAME="ttl-dlx-exchange"; //队列名 private static final String QUEUE_NAME="ttl-dlx-queue"; //死信队列 private static final String DLX_QUEUE_NAME="dlx-queue"; //死信交换机 private static final String DLX_EXCHANGE_NAME="dlx-exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //申明队列 Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 5000); map.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); channel.queueDeclare(QUEUE_NAME, false, false, false, map); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dlx.#"); //绑定死信队列 channel.exchangeDeclare(DLX_EXCHANGE_NAME, "topic"); channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, "#"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到:" + new String(body)); } }; channel.basicConsume(DLX_QUEUE_NAME, true, defaultConsumer); } }
消费者除了手动签收应答还可以拒绝接受消息,让消息重回队列
生产者:
public class Producer { private static final String QUEUE_NAME="helloworld"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建数据传输的通道 Channel channel = connection.createChannel(); //申明队列 /** * 第一个参数:队列的名字 * 第二个参数:是否持久化,发送到队列的消息,如果没有持久化重启会丢失 * 第三个参数:是否排外 * 1:连接关闭之后,这个队列是否自动删除 * 2:是否允许其他通道来进行访问 * 第四个参数:是否允许自动删除 * 第五个参数:申明队列的时候附带的一些参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发送数据到队列 /** * 第一个参数:exchange 交换机 * 第二个参数:路由的key,没有就使用队列的名字 * 第三个参数:发送数据到队列时携带的参数 * 第四个参数:向队列发送的数据 */ channel.basicPublish("",QUEUE_NAME,null,"helloworld".getBytes()); //释放资源 channel.close(); connection.close(); } }
消费者:
public class Consumer { private static final String QUEUE_NAME="helloworld"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者的申明 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消息的唯一标识 * @param envelope 请求消息属性的封装 * @param properties 前面队列携带过来的值 * @param body 接受到的消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受到的消息是:"+new String(body)); /** * 第一个参数:自动应答 * 第二个参数:false表示收到消息了 * 第三个参数:表示决绝签收之后这个消息是否要重回队列? */ channel.basicNack(envelope.getDeliveryTag(),false,true); } }; //绑定消费者 /** * 第一个参数:队列名字 * 第二个参数:是否自动应答 * 第三个参数:消费者 */ channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
1.消息的延迟投递来解决传递的可靠性
2.日志消息表实现可靠消息的传输
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。