赞
踩
目录
1.1 自定义消费者 1.2 消息的ack与重回队列 1.3 消息的限流 1.4 TTL消息 1.5 死信队列
生产端-可靠性投递
(1)消息落库, 对消息状态进行打标,
(设置状态值),收到消息,把状态修改,针对没有响应的消息,轮循发送,设置最大的尝试次数;
但是在高并发情况下,多次对消息落库,对数据库造成很大压力,在高并发的场景下可能不合适
(2)消息的延迟投递,做二次确认,回调检查;(比较可靠,但是也不一定是100%)
幂等性是什么, 数据库乐观锁机制很象, 不管执行多少次,他的结果是相同的;
比如, update t_reps set count = count -1 , version = version + 1 where version = 1,
1.在海量订单产生的业务高峰期,如何避免消息的重复消费问题
(1)消费端实现幂等性, 就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息, 代码跑多次,结果是一次,
2.业界主流的幂等性操作
生成一个全局的ID, 指纹码(可能是业务规则,时间戳等,保证这次操作是唯一的)
select count (1) from T_ORDER where id = 唯一id + 指纹码;
好处, 实现简单
坏处:高并发情况系, 数据库写入的性能瓶颈;
解决:根据id进行分库分表进行算法路由
set 会自动更新,或者自增;需要考虑的问题
第一: 我们是否要进行数据库落库, 如果落库的话, 关键解决的问题是数据库和缓存如何做到原子性?
1.消息的确认,是指生产者投递消息后, 如果broker收到消息,则会给我们生产者一个应答
2.生产者进行接收应答,用来确定这条消息是否正常发送到broker, 这种方式也是可靠性投递的核心保障
1) 在channel 上开启确认模式: channel.confirmSelect();
2) 在channel 上添加监听: addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行重新发送,或者记录日志, 等后续处理
- public class Produce {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.创建ConnectionFactory
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
-
- // 2. 获取Connection
- Connection connection = connectionFactory.newConnection();
-
- //3. 通过Connection 创建一个新的Channel
- Channel channel = connection.createChannel();
-
- // 4 指定消息投递模式,:消息的确认的模式
- channel.confirmSelect();
-
- String exchangeName = "test_confirm_exchange";
- String routingKey = "confirm.save";
-
- // 5 发送一条消息
- String msg = "Hello RabbitMq sen confirm message";
- channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
-
- // 6. 添加一个监听
- channel.addConfirmListener(new ConfirmListener() {
-
- // 成功
- // 关键的唯一的消息标签deliveryTag, multiple :是否批量, 暂时不用管
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("成功 Ack");
- }
-
- // 失败, 比如磁盘写满了,mq 出现一些异常, key 容量达到上限,也有可能handleAck,handleNack 都没有收到, 进行抓取,和重发
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("失败 Nack");
- }
- });
-
-
- }
- }

消费者
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1.创建ConnectionFactory
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
-
- // 2. 获取Connection
- Connection connection = connectionFactory.newConnection();
-
- //3. 通过Connection 创建一个新的Channel
- Channel channel = connection.createChannel();
-
-
- String exchangeName = "test_confirm_exchange";
- String routingKey = "confirm.*";
-
- // AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable)
- // 4.声明一个交换机和队列, 然后进行绑定设置,最后指定路由key
- channel.exchangeDeclare(exchangeName, "topic", true);
-
- //Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
- // Map<String, Object> arguments) throws IOException;
- //queue:队列名称, durable:是否持久话;arguments:扩展参数
- String queueName = "test_confirm_querue";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- // 5, 创建消费者, 指定Consumer的channel
- QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
- // 设置消费者, 指定消费的队列, autoAck: 自动签收, callback:queueingConsumer
- channel.basicConsume(queueName, true, queueingConsumer);
-
- while (true){
- QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
- System.out.println("消费端" + new String(delivery.getBody()));
- }
- }
- }

1.Return Listener 用于处理一些不可路由的消息, 也是生产端需要设置的, 我的的消息生产者,用过指定一个Exchange和Routingkey ,把消息送达到某一个对队列中去,然后我们的消费者监听队列,进行消费的处理操作;但是在某些情况下,如果我们在发送消息的时候, 当前的exchange 不存在或者指定的路由key 路由不到, 这个时候如果我们需要监听这种不可达的消息, 就要使用Return Listener;
2、基础api中的一个非常重要的关键配置项
Mandatory:如果为true, 则监听器会接收到路由不可达的消息,然后进行后续处理, 如果为false, 那么broker端自动删除该消息;
看一下测试代码
生产者
- public class Produce {
-
- /**
- * @param args
- * @throws IOException
- * @throws TimeoutException
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
-
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "test_return_exchange";
- String routingKey = "return.save";
- String routingKeyError = "abc.save";
-
- channel.addReturnListener(new ReturnListener() {
- /**
- * int replyCode, 响应吗, 路由成没成功
- * String replyText, 回复内容
- * String exchange,
- * String routingKey,
- * AMQP.BasicProperties properties,
- * byte[] body 实际的消息体内容
- * @param replyCode
- * @param replyText
- * @param exchange
- * @param routingKey
- * @param properties
- * @param body
- * @throws IOException 跑出ioexception 异常
- */
-
- public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("handle return");
- System.out.println("replyCode: " + replyCode);
- System.out.println("exchange: " + exchange);
- System.out.println("routingKey: " + routingKey);
- System.out.println("properties: " + properties.toString());
- System.out.println("body: " + new String(body));
- }
- });
-
- String msg = "Hello RabbitMQ return Message";
- // Mandatory:如果为true, 则监听器会接收到路由不可达的消息,然后进行后续处理, 如果为false, 那么broker端自动删除该消息;
- channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());
-
-
- }
- }

消费者
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
-
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "test_return_exchange";
- String routingKey = "return.#";
- String queueName = "test_return_queue";
- channel.exchangeDeclare(exchangeName, "topic", true, false, null);
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
- QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
-
- channel.basicConsume(queueName, true, queueingConsumer);
- while (true) {
- QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
- String msg = new String(delivery.getBody());
- System.out.println(msg);
- }
-
- }
- }

我们前面的demo 中是在代码中编写while循环,进行 queueingConsumer.nextDelivery(); 方法进行获取下一条消息,然后惊醒消费处理,这样是不合理的; 我们在工作中使用自定义的Consumer 更加的方便,解耦性更加的强,更加的优雅, 也是在实际工作中最常用的使用方式;
我们通过继承DefaultConsumer 方法重写handleDelivery(...) 方法来自定义监听
生产者:
- public class Produce {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- String exchangeName = "test_consumer_exchange";
- String routingKey = "consumer.save";
-
- // 发送5条消息
- String msg = "Hello RabbitMq sen consumer message";
- for (int i = 0; i < 5; i++) {
- channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
- }
- }
- }

消费者
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "test_consumer_exchange";
- String routingKey = "consumer.*";
- channel.exchangeDeclare(exchangeName, "topic", true);
- String queueName = "test_consumer_queue";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
- // 5, 创建消费者, 指定Consumer的channel
- QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
- // 设置消费者, 指定消费的队列, autoAck: 自动签收, callback:queueingConsumer
- // MyConsumer 自定义的消费者, 去除之前的while 循环
- channel.basicConsume(queueName, true, new MyConsumer(channel));
- }
- }

自定义监听
- public class MyConsumer extends DefaultConsumer {
-
- public MyConsumer(Channel channel) {
- super(channel);
- }
-
- @Override
- public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
- Envelope envelope, //
- AMQP.BasicProperties properties,
- byte[] body)
- throws IOException {
- System.out.println("---handleDelivery----");
- System.out.println("consumerTag" + consumerTag);
- System.out.println("envelope" + envelope.toString());
- System.out.println("properties" + properties);
- System.out.println("body" + new String(body));
- }
-
- }
- 打印结果
- ---handleDelivery----
- consumerTagamq.ctag-3Rq24qZSqYFVX4lCkN_HKA
- envelopeEnvelope(deliveryTag=1, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
- properties#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
- bodyHello RabbitMq sen consumer message
- ---handleDelivery----
- consumerTagamq.ctag-3Rq24qZSqYFVX4lCkN_HKA
- envelopeEnvelope(deliveryTag=2, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
- properties#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
- bodyHello RabbitMq sen consumer message
- ---handleDelivery----
- consumerTagamq.ctag-3Rq24qZSqYFVX4lCkN_HKA
- envelopeEnvelope(deliveryTag=3, redeliver=false, exchange=test_consumer_exchange, routingKey=consumer.save)
- properties#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
- bodyHello RabbitMq sen consumer message

什么是限流:假设一个场景, 首先,我么的rabbitmq 服务器上有上万条未处理的消息,我们随便打开一个消费者的客户端,会出现下面的情况, 巨量的消息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据;
比如单个生产者一分钟生产几百条数据,但是消费端每分钟可能只能消费几十条数据,这个时候生产端和消费端存在严重的不平衡;消费端要做限流,是消费端更稳定;
RabbitMq 提供了一种qos(服务质量保证) 功能, 即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos 的值)未被确认前,不进行消费新的消息;
如果是在限流的情况下,一定不能设置自动签收,autoAck = false; 一定要手动的进行消费;
- void basicQos(int prefetchSize, 消费端 : 0, 不限制
- int prefetchCount, //一次最多处理多少条消息,一般在工作中设置为1 就好了
- boolean global // 这个限流策略是在什么上应用的, RabbitMq 上有两个级别, 1.channel, 2,consumer, 一般是false, true:在 channel, false:在consumer
- ) throws IOException
生产者
- public class Produce {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- String exchangeName = "test_qos_exchange";
- String routingKey = "qos.save";
-
- // 5 发送一条消息
- String msg = "Hello RabbitMq send qos message";
- for (int i = 0; i < 5; i++) {
- channel.basicPublish(exchangeName, routingKey, true,null, msg.getBytes());
- }
- }
- }

消费端
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "test_qos_exchange";
- String routingKey = "qos.#";
- String queueName = "test_qos_queue";
- channel.exchangeDeclare(exchangeName, "topic", true);
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- //prefetchCount 消费端最大消费数量, 1为一条一条的处理
- channel.basicQos(0, 1, false);
- // 1. 限流方式; autoAck false, 签收方式设置为手动签收
- channel.basicConsume(queueName, false , new MyConsumer(channel));
- }
- }

自定义的监听
- public class MyConsumer extends DefaultConsumer {
-
-
- private Channel channel;
- public MyConsumer(Channel channel) {
- super(channel);
- this.channel = channel;
- }
-
- @Override
- public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
- Envelope envelope, //
- AMQP.BasicProperties properties,
- byte[] body)
- throws IOException {
- System.out.println("---handleDelivery----");
- System.out.println("consumerTag" + consumerTag);
- System.out.println("envelope" + envelope.toString());
- System.out.println("properties" + properties);
- System.out.println("body" + new String(body));
-
- //long deliveryTag,
- // boolean multiple, 是否批量删除, false 不支持批量签收
- //basicAck 主动给broker 推送消息, 若果basicAck 不打开只能控制台打印一条消息
- long deliveryTag = envelope.getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- }
-
- }

消费端的手工ACK(确认)和NACK(没有确认,不成功); 消费端进行消费的时候, 如果由于业务异常我们可以进行日志的记录, 然后进行补偿;如果由于服务器宕机等严重的问题, 那我们就需要手工进行ACK保障消费端消费成功
消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker! 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;
- public class Produce {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- String exchangeName = "test_ack_exchange";
- String routingKey = "ack.save";
-
-
- Map<String, Object> headers = new HashMap<String, Object>();
- for (int i = 0; i < 5; i++) {
- headers.put("num", i);
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .deliveryMode(2) // 投递模式
- .contentEncoding("UTF-8")
- .headers(headers)
- .build();
- String msg = "Hello RabbitMq End ack SMessage" + i;
- channel.basicPublish(exchangeName, routingKey, true,properties, msg.getBytes());
- }
- }
- }
-
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "test_ack_exchange";
- String routingKey = "ack.#";
- String queueName = "test_ack_queue";
- channel.exchangeDeclare(exchangeName, "topic", true);
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- // 1. 手工签收,必须关闭autoAck = false;
- channel.basicConsume(queueName, false , new MyConsumer(channel));
- }
- }
-
-
- public class MyConsumer extends DefaultConsumer {
-
-
- private Channel channel;
-
- public MyConsumer(Channel channel) {
- super(channel);
- this.channel = channel;
- }
-
- @Override
- public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
- Envelope envelope, //
- AMQP.BasicProperties properties,
- byte[] body)
- throws IOException {
- System.out.println("---handleDelivery----");
- System.out.println("body" + new String(body));
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if ((Integer) properties.getHeaders().get("num") == 0) {
- // 一般requeue 设置为false, 如果消息失败,会把这条消息添加到队列的尾端, 如果一直失败, 一直尝试
- channel.basicNack(envelope.getDeliveryTag(), false, true);
- }else {
- //long deliveryTag,
- // boolean multiple, 是否批量删除, false 不支持批量签收
- //basicAck 主动给broker 推送消息
- long deliveryTag = envelope.getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- }
-
- }
-
- }

死信队列:DLX, Dead-letter-Exchange;
利用DLX ,当消息在一个队列中变成死信(dead message) 之后, 他能被冲新publish到另一个Exchange, 这个exchange 就是DLX
1、消息被拒绝(basic.reject/ basic.nack) 并且 requeue=false
2、下次TTL 过期
3、队列达到最大长度
2.
3
- public class Produce {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- String exchangeName = "test_dlx_exchange";
- String routingKey = "dlx.save";
-
-
- String msg = "Hello RabbitMq Send DLX SMessage" ;
-
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .deliveryMode(2)
- .contentEncoding("UTF-8")
- .expiration("10000")
- .build();
- channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
- }
- }
-
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
- // 这就是一个普通的交换机和队列,交换机
- String exchangeName = "test_dlx_exchange";
- String routingKey = "dlx.#";
- String queueName = "test_dlx_queue";
- channel.exchangeDeclare(exchangeName, "topic", true);
- Map<String, Object> agruments = new HashMap<String, Object>();
- agruments.put("x-dead-letter-exchange","dlx.exchange");
-
- channel.queueDeclare(queueName, true, false, false, agruments);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- // 要进行死信队列的声明,绑定
- channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
- channel.queueDeclare("dlx.queue",true, false, false, null);
- channel.queueBind("dlx.queue", "dlx.exchange","#");
-
- // 1. 手工签收,必须关闭autoAck = false;
- channel.basicConsume(queueName, true , new MyConsumer(channel));
- }
- }
-
- public class MyConsumer extends DefaultConsumer {
-
-
- private Channel channel;
-
- public MyConsumer(Channel channel) {
- super(channel);
- this.channel = channel;
- }
-
- @Override
- public void handleDelivery(String consumerTag, // 消费者标签, 自动生成的
- Envelope envelope, //
- AMQP.BasicProperties properties,
- byte[] body)
- throws IOException {
- System.out.println("---handleDelivery----");
- System.out.println("body" + new String(body));
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。