赞
踩
rabbitmq 版本为3.8版本,官方网址 : https://www.rabbitmq.com,采用集群镜像仲裁模式。
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server Connection:连接,应用程序与Broker的网络连接 TCP/IP/三次握手和四次挥手 Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。 Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。 Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力) Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key. Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。 Queue:队列也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
解耦、削峰、异步
Exchange | Queue | Routing-key |
---|---|---|
EX.{交换机类型}.{模块名}.{功能名}.[业务名1].[业务名2]... | MQ.{模块名}.{功能名}.[业务名1].[业务名2]... | RK.{模块名}.{接收模块名}.{功能名}.[业务名1].[业务名2]... |
EX.TOPIC.PAY.UNION.OFFLINE | MQ.PAY.ORDER.UNION.OFFLINE | RK.PAY.UNION.OFFLINE |
package com.yuejf.utils.constant; /** * redis key * * @author lu */ public class RabbitMqConstants { private RabbitMqConstants() { } /** * app业务 */ public static class APP { public static final String APP = "APP."; /****测试开始*****/ public static final String TEST_EXCHANGE = TOPIC_EXCHANGE + APP + "TEST"; public static final String TEST_QUEUE = QUEUE + APP + "TEST"; /****测试结束*****/ } /** * 支付业务 */ public static class PAY { } /** * 订单业务 */ public static class ORDER { } /** * 商品业务 */ public static class GOODS { } /** * 交换机 */ //直接类型 public static final String TOPIC_EXCHANGE = "EX.TOPIC."; //主题类型 public static final String DELAY_EXCHANGE = "EX.DELAY."; //标题类型 public static final String FANOUT_EXCHANGE = "EX.FANOUT."; //扇出类型 public static final String HEADERS_EXCHANGE = "EX.HEADERS."; /** * 队列 Queue */ public static final String QUEUE = "MQ."; /** * Routing-key */ public static final String ROUTING_KEY = "RK."; } |
默认情况下RabbitMQ发送的消息是为字节码,我们采用统一的JSON格式的消息
如果规定了消息的格式为JSON,并使用消息转换器,则会自动将消息转化为JSON格式而不需要每次手动进行转换。
RabbitTemplate默认使用SimpleMessageConverter作为自己的消息转化器,而SimpleMessageConverter并不能满足JSON消息的需求。
我们可以使用Jackson2JsonMessageConverter作为默认的消息转换器。
@Configuration public class RabbitConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } } |
要保证消息不丢失,需要做到三点
ACK 机制就是消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,然后 RabbitMQ 收到反馈后才将此消息从队列中删除。
由于 ACK 机制本身必须回复给 RabbitMQ,消息才会丢弃这个特点。对于何时给 ACK,我们做开发的时候一定要在开发项目前提前规划好、设计好。
我们使用 RabbitMQ 通常不想在收到消息就立即给回 ACK 的,也不会设置 autoACK 机制即消费端收到自动返回一个 ACK 响应。一般来讲,我们都会根据业务逻辑的不同,会在不同的位置手动返回 ACK。
这时候,就可能出现问题:当收到消息,有时候处理业务逻辑报错了,往往在处理完业务逻辑就会忽略 ACK,这会导致消息始终卡死在 queue 里……如果数量越来越多,后续处理非常麻烦。
有时候消息投递出错,并不总是在应用接收的时候出了问题,会有很多非应用的问题。
比如:
消费端有问题,发出的消息被拒绝了。并且我们也设置了 requeue=false;
消息可能因为没有收到 ACK 超时被删除,或者消费端消费速度跟不上导致消息超时被删除;
消息数量超过了队列最大长度限制被抛弃;
消息总大小超过了队列消息总大小限制被抛弃。
对于这些问题,设置 dead letter exchanges 算是一个解决办法。
当消息一旦出现我上面列举出来的情况,就会被发送到我们设置的 dead letter exchanges。然后我们就可以对这些特殊情况的消息进行单独处理,这样的做法可以让我们的项目更健壮,更容易追踪问题。
RabbitMQ 的Exchange 就是消息交换机,它指定消息按什么规则,路由到哪个队列。
有四种类型:
Direct:处理路由键,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键为“green”,则只有路由键为“green”的消息才被转发,不会转发路由键为"red",只会转发路由键为“green”。
Topic:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。
Fanout:不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上。
Headers:不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
在这四种类型里,Direct 类型的 Exchange 投递消息是最快的。其他的 Exchange,MQ 还得花时间计算投递的位置。
所以,能使用 Direct 类型的建议使用 Direct。
官网 Consumer Acknowledgements and Publisher Confirms — RabbitMQ
# 连接到RabbitMQ的虚拟主机 , 注意预上线环境为review spring.rabbitmq.virtual-host=/test #确认消息已发送到交换机(Exchange) 异步 spring.rabbitmq.publisher-confirm-type=orrelated # 开启本地重试机制,默认3次,避免内存泄漏 spring.rabbitmq.listener.simple.retry.enabled=true # 客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合 spring.rabbitmq.address # 当缓存已满时,获取Channel的等待时间,单位为毫秒 spring.rabbitmq.cache.channel.checkout-timeout # 缓存中保持的Channel数量 spring.rabbitmq.cache.channel.size # 连接缓存的模式 CHANNEL spring.rabbitmq.cache.connection.mode # 缓存的连接数 spring.rabbitmq.cache.connection.size # 连接超时参数单位为毫秒:设置为“0”代表无穷大 spring.rabbitmq.connnection-timeout # 默认创建一个AmqpAdmin的Bean true spring.rabbitmq.dynamic # RabbitMQ的主机地址 localhost spring.rabbitmq.host # 容器的acknowledge模式 spring.rabbitmq.listener.acknowledge-mode # 启动时自动启动容器 true spring.rabbitmq.listener.auto-startup # 消费者的最小数量 spring.rabbitmq.listener.concurrency # 消费者每次从队列获取的消息数量 (默认一次250个),配置消息的预读数量控制消费者一次性从队列中读取多少条消息,做到能者多劳的配置(因为在实际的生产环境中每个服务器的配置不可能完全相同,带来的处理消息的时间也不一样)。 spring.rabbitmq.listener.simple.prefetch # 投递失败时是否重新排队 true spring.rabbitmq.listener.default-requeue-rejected # 消费者的最大数量 spring.rabbitmq.listener.max-concurrency # 在单个请求中处理的消息个数,他应该大于等于事务数量 spring.rabbitmq.listener.prefetch # 不论是不是重试的发布 false spring.rabbitmq.listener.retry.enabled # 第一次与第二次投递尝试的时间间隔 1000 spring.rabbitmq.listener.retry.initial-interval # 尝试投递消息的最大数量 3 spring.rabbitmq.listener.retry.max-attempts # 两次尝试的最大时间间隔 10000 spring.rabbitmq.listener.retry.max-interval # 上一次尝试时间间隔的乘数 1.0 spring.rabbitmq.listener.retry.multiplier # 不论重试是有状态的还是无状态的 true spring.rabbitmq.listener.retry.stateless # 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值 spring.rabbitmq.listener.transaction-size # 登录到RabbitMQ的密码 spring.rabbitmq.password # RabbitMQ的端口号 5672 spring.rabbitmq.port # 开启Publisher Confirm机制 false spring.rabbitmq.publisher-confirms # 开启publisher Return机制 false spring.rabbitmq.publisher-returns # 请求心跳超时时间,单位为秒 spring.rabbitmq.requested-heartbeat # 启用SSL支持 false spring.rabbitmq.ssl.enabled # 保存SSL证书的地址 spring.rabbitmq.ssl.key-store # 访问SSL证书的地址使用的密码 spring.rabbitmq.ssl.key-store-password # SSL的可信地址 spring.rabbitmq.ssl.trust-store # 访问SSL的可信地址的密码 spring.rabbitmq.ssl.trust-store-password # SSL算法,默认使用Rabbit的客户端算法库 spring.rabbitmq.ssl.algorithm # 启用强制信息 false。当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者; spring.rabbitmq.template.mandatory # receive()方法的超时时间 0 spring.rabbitmq.template.receive-timeout # sendAndReceive()方法的超时时间 5000 spring.rabbitmq.template.reply-timeout # 设置为true的时候RabbitTemplate能够实现重试 false spring.rabbitmq.template.retry.enabled # 第一次与第二次发布消息的时间间隔 1000 spring.rabbitmq.template.retry.initial-interval # 尝试发布消息的最大数量 3 spring.rabbitmq.template.retry.max-attempts # 尝试发布消息的最大时间间隔 10000 spring.rabbitmq.template.retry.max-interval # 上一次尝试时间间隔的乘数 1.0 spring.rabbitmq.template.retry.multiplier # 登录到RabbitMQ的用户名 spring.rabbitmq.username |
注意事项
涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析 RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化 SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
注意:SimpleMessageConverter(默认)和 Jackson2JsonMessageConverter 序列化后接受的参数不同,一个是有转义符,一个是没有的,所以,大的消息体用send发送,接收注意转义数据,一般消息体可以用convertAndSend 发送
@Configuration public class DirectRabbitConfig { @Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue(RabbitMqConstants.APP.TEST_QUEUE, true); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange TestDirectExchange() { return new DirectExchange(RabbitMqConstants.APP.TEST_EXCHANGE, true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(RabbitMqConstants.APP.TEST_ROUTING_KEY); } } @RestController @Slf4j public class HelloController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @RequestMapping("/sendDirectConvertMessage") public R<?> sendDirectConvertMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test convert message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //convertAndSend 方法相当于简化了的send方法,可以自动处理消息的序列化 rabbitTemplate.convertAndSend(RabbitMqConstants.APP.TEST_EXCHANGE, RabbitMqConstants.APP.TEST_ROUTING_KEY, map); return R.ok(); } @RequestMapping("/sendDirectMessage") public R<?> sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //JackJson默认序列化 ObjectMapper objectMapper = new ObjectMapper(); Message message = MessageBuilder.withBody(objectMapper.toString().getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("1") .setHeader("header", "header") .build(); rabbitTemplate.send(RabbitMqConstants.APP.TEST_EXCHANGE, RabbitMqConstants.APP.TEST_ROUTING_KEY, message); return R.ok(); } } @Slf4j @Component @RabbitListener(queues = RabbitMqConstants.APP.TEST_QUEUE) public class TestMqListener { @RabbitHandler public void receiveMsg(String msg, Channel channel, Message message) throws IOException { log.info("Str收到的消息-> msg:{}", msg); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception ex) { log.error("TestMqListener#Exception:{}", ex); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } @RabbitHandler public void receiveMsg(byte[] msg, Channel channel, Message message) throws IOException { log.info("By收到的消息-> msg:{}", msg.toString()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception ex) { log.error("TestMqListener#Exception:{}", ex); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } @RabbitHandler public void receiveMsg(Map msg, Channel channel, Message message) throws IOException { log.info("Map收到的消息-> msg:{}", msg); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception ex) { log.error("TestMqListener#Exception:{}", ex); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。