赞
踩
spring: rabbitmq: host: 127.0.0.1 #ip port: 5672 #端口 username: guest #账号 password: guest #密码 virtualHost: #链接的虚拟主机 addresses: 127.0.0.1:5672 #多个以逗号分隔,与host功能一样。 requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s publisherConfirms: true #发布确认机制是否启用 #确认消息已发送到交换机(Exchange) #publisher-confirm-type参数有三个可选值: #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。 #CORRELATED:消息从生产者发送到交换机后触发回调方法。 #NONE(默认):关闭发布确认模式。 #publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true publisherReturns: true #发布返回是否启用 connectionTimeout: #链接超时。单位ms。0表示无穷大不超时 ### ssl相关 ssl: enabled: #是否支持ssl keyStore: #指定持有SSL certificate的key store的路径 keyStoreType: #key store类型 默认PKCS12 keyStorePassword: #指定访问key store的密码 trustStore: #指定持有SSL certificates的Trust store trustStoreType: #默认JKS trustStorePassword: #访问密码 algorithm: #ssl使用的算法,例如,TLSv1.1 verifyHostname: #是否开启hostname验证 ### cache相关 cache: channel: size: #缓存中保持的channel数量 checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel connection: mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION size: #缓存的连接数,只有是CONNECTION模式时生效 ### listener listener: type: #两种类型,SIMPLE,DIRECT ## simple类型 simple: concurrency: #最小消费者数量 maxConcurrency: #最大的消费者数量 transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量 missingQueuesFatal: #是否停止容器当容器中的队列不可用 ## 与direct相同配置部分 autoStartup: #是否自动启动容器 acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量 defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) idleEventInterval: #container events发布频率,单位ms ##重试机制 retry: stateless: #有无状态 enabled: #是否开启 maxAttempts: #最大重试次数,默认3 initialInterval: #重试间隔 multiplier: #对于上一次重试的乘数 maxInterval: #最大重试时间间隔 direct: consumersPerQueue: #每个队列消费者数量 missingQueuesFatal: #...其余配置看上方公共配置 ## template相关 template: mandatory: #是否启用强制信息;默认false receiveTimeout: #`receive()`接收方法超时时间 replyTimeout: #`sendAndReceive()`超时时间 exchange: #默认的交换机 routingKey: #默认的路由 defaultReceiveQueue: #默认的接收队列 ## retry重试相关 retry: enabled: #是否开启 maxAttempts: #最大重试次数 initialInterval: #重试间隔 multiplier: #失败间隔乘数 maxInterval: #最大间隔
############################ base ############################ spring.rabbitmq.host: 服务Host spring.rabbitmq.port: 服务端口 spring.rabbitmq.username: 登陆用户名 spring.rabbitmq.password: 登陆密码 spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host) spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s spring.rabbitmq.publisher-confirms: 是否启用【发布确认】 #确认消息已发送到交换机(Exchange) #publisher-confirm-type参数有三个可选值: #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。 #CORRELATED:消息从生产者发送到交换机后触发回调方法。 #NONE(默认):关闭发布确认模式。 #spring.rabbitmq.publisher-confirms-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true spring.rabbitmq.publisher-returns: 是否启用【发布返回】 spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时 ############################ ssl ############################ spring.rabbitmq.ssl.enabled: 是否支持ssl spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径 spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码 spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码 spring.rabbitmq.ssl.trust-store-type: JKS:Trust store 类型 spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置,例如,TLSv1.1 spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证 spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证 ############################ cache ############################ spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量 spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效 spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION ############################ listener ############################ spring.rabbitmq.listener.type=simple: 容器类型.simple或direct spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器 spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量 spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量 spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量. spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量. spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器 spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒 spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用 spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数 spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔 spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数 spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔 spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态 spring.rabbitmq.listener.direct.acknowledge-mode= ack模式 spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器 spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量. spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队. spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔. spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败. spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量. spring.rabbitmq.listener.direct.retry.enabled=false 是否启用发布重试机制. spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message. spring.rabbitmq.listener.direct.retry.max-attempts=3 # Maximum number of attempts to deliver a message. spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts. spring.rabbitmq.listener.direct.retry.multiplier=1 # Multiplier to apply to the previous retry interval. spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful. ############################ template ############################ spring.rabbitmq.template.mandatory: 启用强制信息;默认false spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间 spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间 spring.rabbitmq.template.retry.enabled: 发送重试是否可用 spring.rabbitmq.template.retry.max-attempts: 最大重试次数 spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔 spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数 spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔
//常用的三个配置如下 //1---设置手动应答(acknowledge-mode: manual) // 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调 // publisher-confirm-type: correlated // #保证交换机能把消息推送到队列中 // publisher-returns: true // template: // #以下是rabbitmqTemplate配置 // mandatory: true) // 3---设置重试 @Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; //@Bean 缓存连接池 //public CachingConnectionFactory rabbitConnectionFactory @Autowired private RabbitProperties properties; //这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉 // 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称 // @Bean // public ConnectionFactory connectionFactory() throws Exception { // //创建工厂类 // CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory(); // //用户名 // cachingConnectionFactory.setUsername("gust"); // //密码 // cachingConnectionFactory.setPassword("gust"); // //rabbitMQ地址 // cachingConnectionFactory.setHost("127.0.0.1"); // //rabbitMQ端口 // cachingConnectionFactory.setPort(Integer.parseInt("5672")); // // //设置发布消息后回调 // cachingConnectionFactory.setPublisherReturns(true); // //设置发布后确认类型,此处确认类型为交互 // cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); // // cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); // return cachingConnectionFactory; // } // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(rabbitConnectionFactory); // 并发消费者数量 containerFactory.setConcurrentConsumers(1); containerFactory.setMaxConcurrentConsumers(20); // 预加载消息数量 -- QOS containerFactory.setPrefetchCount(1); // 应答模式(此处设置为手动) containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //消息序列化方式 containerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); // 设置通知调用链 (这里设置的是重试机制的调用链) containerFactory.setAdviceChain( RetryInterceptorBuilder .stateless() .recoverer(new RejectAndDontRequeueRecoverer()) .retryOperations(rabbitRetryTemplate()) .build() ); return containerFactory; } // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称 @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory); //默认是用jdk序列化 //数据转换为json存入消息队列,方便可视化界面查看消息数据 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); //此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链 rabbitTemplate.setRetryTemplate(rabbitRetryTemplate()); //CorrelationData correlationData, boolean b, String s rabbitTemplate.setConfirmCallback( (correlationData, b, s) -> { System.out.println("ConfirmCallback "+"相关数据:"+ correlationData); System.out.println("ConfirmCallback "+"确认情况:"+b); System.out.println("ConfirmCallback "+"原因:"+s); }); //Message message, int i, String s, String s1, String s2 rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { System.out.println("ReturnCallback: "+"消息:"+message); System.out.println("ReturnCallback: "+"回应码:"+i); System.out.println("ReturnCallback: "+"回应消息:"+s); System.out.println("ReturnCallback: "+"交换机:"+s1); System.out.println("ReturnCallback: "+"路由键:"+s2); }); return rabbitTemplate; } //重试的Template @Bean public RetryTemplate rabbitRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // 设置监听 调用重试处理过程 retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) { // 执行之前调用 (返回false时会终止执行) return true; } @Override public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 重试结束的时候调用 (最后一次重试 ) System.out.println("---------------最后一次调用"); return ; } @Override public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 异常 都会调用 System.err.println("-----第{}次调用"+retryContext.getRetryCount()); } }); retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); retryTemplate.setRetryPolicy(retryPolicyByProperties()); return retryTemplate; } @Bean public ExponentialBackOffPolicy backOffPolicyByProperties() { ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds(); long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds(); double multiplier = properties.getListener().getSimple().getRetry().getMultiplier(); // 重试间隔 backOffPolicy.setInitialInterval(initialInterval * 1000); // 重试最大间隔 backOffPolicy.setMaxInterval(maxInterval * 1000); // 重试间隔乘法策略 backOffPolicy.setMultiplier(multiplier); return backOffPolicy; } @Bean public SimpleRetryPolicy retryPolicyByProperties() { SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts(); retryPolicy.setMaxAttempts(maxAttempts); return retryPolicy; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。