赞
踩
docker pull rabbitmq:management
# 创建文件夹
mkdir /app/mq/data
# 编辑配置
vim /app/mq/rabbitmq.conf
# 配置内容
#开启默认账号
loopback_users.guest = true
#监听对外通信端口配置
listeners.tcp.default = 5672
#集群节点发现配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
#集群节点1(后续基于节点1扩展多节点)
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
vim /app/mq/.erlang.cookie
BVWDHYVHEZGVSKPDYUDK
**集群中各节点.erlang.cookie内容应一致**
# 修改cookie文件的权限
chmod 600 /caso/mq/.erlang.cookie
docker network create mq-net
docker run -d --hostname rabbitmq1 --restart unless-stopped --name rabbitmq1 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management 参数解释: -d 容器后台运行 --hostname rabbitmq1 容器的主机名为 rabbitmq1 --restart=unless-stopped docker 容器重启后重启MQ --name rabbitmq1 容器名为rabbitma1,在宿主机上运行“docker ps”命令时显示的名称 -p 4369:4369 EPMD(Erlang端口映射守护进程)端口号 -p 25672:25672 节点间和CLI工具通信(Erlang分发服务器端口) -p 5672:5672 消息通讯端口 -p 15672:15672 后台管理端口 -e RABBITMQ_DEFAULT_USER=root 设置rabbitmq默认用户为root -e RABBITMQ_DEFAULT_PASS=123456 设置rabbitmq默认密码为123456 -v /app/mq/data:/var/lib/rabbitmq 挂载/app/mq/data目录到容器 -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 映射配置文件 -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie 映射cookie文件,也可以使用-e RABBITMQ_ERLANG_COOKIE='rabbitmq' 设置 rabbitmq的cookie为“rabbitmq”,可以自定义为其他文本,容器保持一致即可。因为新版本中RABBITMQ_ERLANG_COOKIE环境变量方式已被标记为弃用,在需要加载cookie的命令执行时会有警告,未来版本该方式会被移除,推荐映射文件方式部署
# 下载插件包
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
**注意插件版本要与rabbitmq版本对应**
mkdir /app/mq/mq-plugins
将插件上传到对应目录
# 拷贝插件到容器
docker cp /app/mq/mq-plugins/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq1:/plugins
# 进入容器
docker exec -it rabbitmq1 /bin/bash
# 验证插件是否存在
cd plugins
ls |grep delay
# 启动延时消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 启动消息追踪日志插件
rabbitmq-plugins enable rabbitmq_tracing
exit
docker restart rabbitmq1
<!-- 消息中间件 rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.0.4</version>
</dependency>
spring: rabbitmq: host: 192.168.0.210 port: 5672 username: root password: 123456 # 消费者确认机制相关配置 # 开启publisher-confirm, # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调ConfirmCallback publisher-confirm-type: correlated # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback publisher-returns: true # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息 template: mandatory: true listener: simple: # 一次拉取的数量(默认值:250,强制要求顺序消费配置1) prefetch: 250 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。) concurrency: 10 # 消费端的监听最大个数 max-concurrency: 100 # 消息手动确认 acknowledge-mode: manual # 消费者retry重试机制 retry: enabled: true # 每次失败后等待时长的翻倍数 multiplier: 1 # 最大重试次数 max-attempts: 3 # 重试状态 stateless: true direct: auto-startup: true acknowledge-mode: manual retry: enabled: true max-attempts: 3 type: simple
@Configuration public class MqProducerConsumerConfig { /** * 延迟交换机 */ public static final String DELAY_EXCHANGE_NAME = "member.delay.exchange"; /** * 延迟队列 */ public static final String DELAY_QUEUE_NAME = "member.delay.queue"; /** * 延迟队列路由key */ public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey"; /** * 声明延迟交换机 * * @return CustomExchange 延迟交换机 */ @Bean public CustomExchange delayExchange() { HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args); } /** * 声明延迟队列 * * @return Queue 延迟队列 */ @Bean public Queue delayQueue() { return new Queue(DELAY_QUEUE_NAME); } /** * 设置延迟队列的绑定关系 * * @return Binding 延迟队列的绑定关系 */ @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_QUEUE_ROUTING_KEY).noargs(); } }
@Configuration public class MqErrorMsgConfig { /** * 交换机 */ public static final String EXCHANGE_NAME = "member.error.exchange"; /** * 队列 */ public static final String QUEUE_NAME = "member.error.queue"; /** * 队列路由key */ public static final String QUEUE_ROUTING_KEY = "error"; /** * 声明失败消息交换机 * * @return 交换机 */ @Bean public DirectExchange errorExchange() { return new DirectExchange(EXCHANGE_NAME); } /** * 声明失败消息队列 * * @return 消息队列 */ @Bean public Queue errorQueue() { // 设置队列持久化 return new Queue(QUEUE_NAME, true); } /** * 绑定交换机、消息队列,指定对应的队列路由key * * @return 绑定 */ @Bean public Binding errorBind() { return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(QUEUE_ROUTING_KEY); } /** * 实现MessageRecover接口的子类RepublishMessageBuilder * * @param rabbitTemplate rabbitTemplate * @return 异常消息 */ @Bean public MessageRecoverer republishMessageBuilder(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_NAME, QUEUE_ROUTING_KEY); } }
@Component public class MqProducerMsgSendUtil { private static final Logger log = LoggerFactory.getLogger(MqProducerMsgSendUtil.class); @Resource private RabbitTemplate rabbitTemplate; public MqProducerMsgSendUtil() { } public void send(String delayExchangeName, String delayQueueRoutingKey, String msg, Integer delayTime) { log.info("发送mq消息:{},延时:{} ms接收", msg, delayTime); this.rabbitTemplate.convertAndSend(delayExchangeName, delayQueueRoutingKey, msg, (message) -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setHeader("x-delay", delayTime); return message; }); } }
@Slf4j @Component public class RabbitMqMessageListener { @Autowired private MqProducerMsgSendUtil mqProducerMsgSendUtil; /** * rabbitmq延迟消息监听 * * @param message 消息 * @param c 信道 * @param msg 消息内容 */ @RabbitListener(queues = DELAY_QUEUE_NAME) public void receiveMqDelayMsg(Message message, Channel c, String msg) throws Exception { MessageProperties properties = message.getMessageProperties(); String receivedExchange = properties.getReceivedExchange(); String consumerQueue = properties.getConsumerQueue(); String routingKey = properties.getReceivedRoutingKey(); log.info("收到 交换机:{} 队列:{} 路由键:{} 消息:{}", receivedExchange, consumerQueue, routingKey, msg); Boolean haveConsumer = false; // 是需要处理的消息 if (msg.contains(RedisKeyConstants.PAY_ORDER_STATE)) { haveConsumer = true; log.info("执行xxx业务 >>> 消息:{}", msg); ... ... ... // 手动回执,签收 long tag = properties.getDeliveryTag(); c.basicAck(tag, false); } // 没有合适消费者,抛异常消息重发 if (!haveConsumer) { long tag = properties.getDeliveryTag(); // 应答,防止Unacked堆积 c.basicAck(tag, false); log.error("交换机:{} 队列:{} 路由键:{} 消息:{} 没有正常消费,异常重试!", receivedExchange, consumerQueue, routingKey, msg); // 需要将异常抛出才能触发retry重试机制 throw new Exception(MqExceptionEnum.MESSAGE_CONSUMPTION_FAILED.getMsg()); } } }
# 节点2 docker network create mq-net docker run -d --hostname rabbitmq2 --add-host rabbitmq3:x.x.x.x --add-host rabbitmq1:x.x.x.x --restart unless-stopped --name rabbitmq2 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management # 节点3 docker network create mq-net docker run -d --hostname rabbitmq3 --add-host rabbitmq2:x.x.x.x --add-host rabbitmq1:x.x.x.x --restart unless-stopped --name rabbitmq3 --net mq-net -p 4369:4369 -p 25672:25672 -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v /app/mq/data:/var/lib/rabbitmq -v /app/mq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v /app/mq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie rabbitmq:management # 节点1 host增加节点2、节点3映射 docker exec -it rabbitmq1 /bin/bash apt-get update apt-get install vim vim /etc/hosts # 节点2 x.x.x.x rabbitmq2 # 节点3 x.x.x.x rabbitmq3 #重新加载节点; #停止MQ rabbitmqctl stop_app; #resetMQ rabbitmqctl reset; #启动MQ rabbitmqctl start_app; #退出并重启容器 exit; docker restart rabbitmq1; # 节点2加入 #进入容器 docker exec -it rabbitmq2 bash; #停止MQ rabbitmqctl stop_app; #resetMQ rabbitmqctl reset; #启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange; #跟机器1的消息队列建立关系 rabbitmqctl join_cluster --ram rabbit@rabbitmq1; #修改节点类型 rabbitmqctl change_cluster_node_type disc #重新启动MQ rabbitmqctl start_app; #退出并重启容器 exit; docker restart rabbitmq2; # 节点3加入 #进入容器 docker exec -it rabbitmq3 bash; #停止MQ rabbitmqctl stop_app; #resetMQ rabbitmqctl reset; #启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange; #跟机器1的消息队列建立关系 rabbitmqctl join_cluster --ram rabbit@rabbitmq1; #修改节点类型 rabbitmqctl change_cluster_node_type disc #重新启动MQ rabbitmqctl start_app; #退出并重启容器 exit; docker restart rabbitmq3;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。