赞
踩
创建springboot项目,多模块的方式实现消息队列的发布与消息消费。
按照上图方式创建项目及模块:
mq-demo/
consumer/
gradle
src
.gitignore
build.gradle
gradlew
gradlew.bat
HELP.md
settings.gradle
publisher/
gradle
src
.gitignore
build.gradle
gradlew
gradlew.bat
HELP.md
settings.gradle
build.gradle
settings.gradle
implementation 'org.springframework.boot:spring-boot-starter-amqp'
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
package com.example.publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Autowired
private RabbitProperties properties;
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(rabbitConnectionFactory);
// 并发消费者数量
containerFactory.setConcurrentConsumers(1);
containerFactory.setMaxConcurrentConsumers(20);
// 预加载消息数量 -- QOS
containerFactory.setPrefetchCount(1);
// 应答模式(此处设置为手动)
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//消息序列化方式
containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置通知调用链 (这里设置的是重试机制的调用链)
containerFactory.setAdviceChain(
RetryInterceptorBuilder
.stateless()
.recoverer(new RejectAndDontRequeueRecoverer())
.retryOperations(rabbitRetryTemplate())
.build()
);
return containerFactory;
}
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
//默认是用jdk序列化
//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
//CorrelationData correlationData, boolean b, String s
rabbitTemplate.setConfirmCallback(
(correlationData, b, s) -> {
LOGGER.info("ConfirmCallback 相关数据:{}", correlationData);
LOGGER.info("ConfirmCallback 确认情况:{}", b);
LOGGER.info("ConfirmCallback 原因:{}", s);
});
rabbitTemplate.setReturnsCallback((message) -> {
LOGGER.info("ReturnCallback: 消息:{}", message);
});
return rabbitTemplate;
}
//重试的Template
@Bean
public RetryTemplate rabbitRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 设置监听 调用重试处理过程
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
// 执行之前调用 (返回false时会终止执行)
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 重试结束的时候调用 (最后一次重试 )
LOGGER.info("---------------最后一次调用");
return ;
}
@Override
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 异常 都会调用
LOGGER.error("-----第{}次调用",retryContext.getRetryCount());
}
});
retryTemplate.setBackOffPolicy(backOffPolicyByProperties());
retryTemplate.setRetryPolicy(retryPolicyByProperties());
return retryTemplate;
}
@Bean
public ExponentialBackOffPolicy backOffPolicyByProperties() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
// 重试间隔
backOffPolicy.setInitialInterval(initialInterval * 1000);
// 重试最大间隔
backOffPolicy.setMaxInterval(maxInterval * 1000);
// 重试间隔乘法策略
backOffPolicy.setMultiplier(multiplier);
return backOffPolicy;
}
@Bean
public SimpleRetryPolicy retryPolicyByProperties() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
retryPolicy.setMaxAttempts(maxAttempts);
return retryPolicy;
}
}
package com.example.publisher;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class PublisherDemo {
@Autowired
private RabbitTemplate template;
@Scheduled(cron="10 * * * * ? ")
public void send() {
String message = "Hello World!";
this.template.convertAndSend("test-queue", message);
System.out.println(" PublisherDemo Send '" + message + "'");
}
}
2024-01-31T14:13:10.021+08:00 INFO 26676 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2024-01-31T14:13:10.066+08:00 INFO 26676 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#188598ad:0/SimpleConnection@61416cb3 [delegate=amqp://admin@127.0.0.1:5672/, localPort=64067]
2024-01-31T14:13:10.090+08:00 INFO 26676 --- [ scheduling-1] com.example.publisher.RabbitMQConfig : ---------------最后一次调用
PublisherDemo Send 'Hello World!'
2024-01-31T14:14:10.011+08:00 INFO 26676 --- [ scheduling-1] com.example.publisher.RabbitMQConfig : ---------------最后一次调用
PublisherDemo Send 'Hello World!'
package com.example.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerDemo {
@RabbitListener(queues = "test-queue")
@RabbitHandler
public void receive(String in) {
System.out.println(" ConsumerDemo Received '" + in + "'");
}
}
2024-01-31T14:24:34.067+08:00 INFO 22288 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#11d4dbd6:0/SimpleConnection@277bf091 [delegate=amqp://admin@127.0.0.1:5672/, localPort=64963]
2024-01-31T14:24:34.106+08:00 INFO 22288 --- [ main] c.example.consumer.ConsumerApplication : Started ConsumerApplication in 1.235 seconds (process running for 1.874)
ConsumerDemo Received '测试queue'
ConsumerDemo Received '"Hello World!"'
ConsumerDemo Received '"Hello World!"'
RabbitMQ 是一个开源的消息中间件,它实现了 AMQP(高级消息队列协议)标准。作为一个消息中间件,RabbitMQ 提供了可靠的消息传递机制,用于在分布式系统中进行异步通信。
以下是 RabbitMQ 的一些关键概念和特性:
消息队列:RabbitMQ 使用消息队列来存储和转发消息。生产者将消息发送到队列,而消费者从队列中接收并处理这些消息。
发布-订阅模式:RabbitMQ 支持发布-订阅模式,其中一个生产者可以将消息发送到多个消费者。
交换机:用于接收生产者发送的消息,并根据路由规则将其路由到一个或多个队列中。
队列:消息在队列中等待被消费。每个消息都有一个关联的路由键,用于将消息路由到特定的队列。
绑定:用于将交换机和队列关联起来,定义了消息如何从交换机路由到队列。
消费者确认机制:RabbitMQ 提供了消费者确认机制,确保消息被成功消费后再从队列中移除。
持久化:RabbitMQ 允许将消息和队列进行持久化,以确保在服务器重启时消息不会丢失。
RabbitMQ 的优点包括高可靠性、灵活的路由规则、可扩展性和丰富的客户端库支持。它被广泛应用于分布式系统、微服务架构和异步任务处理等场景中,用于解耦和提高系统的可靠性和可伸缩性。
注意:队列名称一定要保证消费者、生产者发送和消费的队列是同一个。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。