赞
踩
helm pull bitnami/rabbitmq --version 12.1.3
tar-zxvf rabbitmq-12.1.3.tgz
添加延时消息插件
global: imageRegistry: "" ## E.g. ## imagePullSecrets: ## - myRegistryKeySecretName ## imagePullSecrets: [] storageClass: "nfs-client" auth: ## @param auth.username RabbitMQ application username ## ref: https://github.com/bitnami/containers/tree/main/bitnami/rabbitmq#environment-variables ## username: 你的用户名 ## @param auth.password RabbitMQ application password ## ref: https://github.com/bitnami/containers/tree/main/bitnami/rabbitmq#environment-variables ## password: "你自己的密码" communityPlugins: "https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez" ## @param extraPlugins Extra plugins to enable (single string containing a space-separated list) ## Use this instead of `plugins` to add new plugins ## extraPlugins: "rabbitmq_auth_backend_ldap rabbitmq_delayed_message_exchange" ## @param replicaCount Number of RabbitMQ replicas to deploy ## replicaCount: 3
helm install rabbitmq rabbitmq/
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
rabbitmq: host: 你的IP port: 你的端口 username: 你的名称 password: 你的密码 virtual-host: / connection-timeout: 200ms template: retry: enabled: true # 生产确认机制 开启会稍微影响点性能 根据业务情况是否需要开启 publisher-confirm-type: correlated listener: simple: # 每次只能获取一条消息,处理完才能获取下一个消息 prefetch: 1 # 收消息确认机制 交给Spring事务管理 acknowledge-mode: auto retry: enabled: true
创建一个错误消息队列 方便人工介入
package com.connm.common.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 创建失败错误交换机和队列 方便以后人工干预 */ @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorRabbitConfig { @Bean public DirectExchange errorMessageExchange() { return new DirectExchange("error.direct"); } @Bean public Queue errorQueue() { return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) { return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
package com.connm.common.mq; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFutureCallback; @Component public class RabbitHelper { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param exchange 交换机 * @param routingKey 路由key * @param msg 消息 */ public void sendMessage(String exchange, String routingKey, Object msg) { rabbitTemplate.convertAndSend(exchange, routingKey, msg); } /** * 发送确认消息 * @param exchange 交换机 * @param routingKey 路由key * @param msg 消息 * @param listenableFutureCallback 确认回调 */ public void sendMessageWithConfirm(String exchange, String routingKey, Object msg, ListenableFutureCallback listenableFutureCallback) { // 1.创建CorrelationData CorrelationData correlationData = new CorrelationData(); // 2.给Future添加ConfirmCallback correlationData.getFuture().addCallback(listenableFutureCallback); // 3.发送消息添加消息确认回调 rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData); } /** * 发送延时消息 * @param exchange 交换机 * @param routingKey 路由key * @param msg 消息 * @param delay 毫秒 */ public void sendDelayMessage(String exchange, String routingKey, Object msg, int delay) { rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 添加延迟消息属性 message.getMessageProperties().setDelay(delay); return message; } }); } /** * 发送延时确认消息 * @param exchange 交换机 * @param routingKey 路由key * @param msg 消息 * @param delay 毫秒 * @param listenableFutureCallback 确认回调 */ public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, int delay, ListenableFutureCallback listenableFutureCallback) { // 1.创建CorrelationData CorrelationData correlationData = new CorrelationData(); // 2.给Future添加ConfirmCallback correlationData.getFuture().addCallback(listenableFutureCallback); rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 添加延迟消息属性 message.getMessageProperties().setDelay(delay); return message; } }, correlationData); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。