当前位置:   article > 正文

【RabbitMQ】基本使用:Spring AMQP配置使用及SpringBoot整合_xml增加spring.rabbitmq.connection-timeout 的值

xml增加spring.rabbitmq.connection-timeout 的值

在上一篇 【RabbitMQ】Spring AMQP核心组件解析我们介绍了 Spring AMQP ,那它具体该如何使用呢?

1.xml配置方式

<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" 
                           password="guest" host="127.0.0.1" port="5672" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.1 Direct Exchange

<!--定义queue -->
<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!--定义direct exchange,绑定MY_FIRST_QUEUE -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" 
                        declared-by="connectAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" 
                 exchange="MY_DIRECT_EXCHANGE" />

<!--消息接收者 -->
<bean id="messageReceiver" class="com.mymq.consumer.FirstConsumer"></bean>

<!--queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
</rabbit:listener-container>

<!--定义queue -->
<rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!-- 将已经定义的Exchange绑定到MY_SECOND_QUEUE,注意关键词是key -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false"
                        declared-by="connectAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息接收者 -->
<bean id="receiverSecond" class="com.mymq.consumer.SecondConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
</rabbit:listener-container>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

1.2 Topic Exchange

<!--定义queue -->
<rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!-- 定义topic exchange,绑定MY_THIRD_QUEUE,注意关键词是pattern -->
<rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" 
                       declared-by="connectAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" 
                 exchange="MY_TOPIC_EXCHANGE" />

<!-- 消息接收者 -->
<bean id="receiverThird" class="com.mymq.consumer.ThirdConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
</rabbit:listener-container>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

1.3 Fanout Exchange

<!--定义queue -->
<rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!-- 定义fanout exchange,绑定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
<rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true"
                        declared-by="connectAdmin" >
    <rabbit:bindings>
        <rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
        <rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- 消息接收者 -->
<bean id="receiverFourth" class="com.mymq.consumer.FourthConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
</rabbit:listener-container>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.SpringBoot整合

使用了SpringBoot后,我们不再需要些上面那么多的配置,因为在RabbitAutoConfiguration中已经对Bean做了自动配置。

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

2.1 定义队列、交换机

@Configuration
public class RabbitConfig {
    // 声明Queue、Exchange、Binding.......
}
  • 1
  • 2
  • 3
  • 4

1.定义交换机

@Bean("directExchange") // 定义的name会在绑定时用到
public DirectExchange getDirectExchange() {
    return new DirectExchange("DIRECT_EXCHANGE")
}

@Bean("topicExchange")
public TopicExchange getTopicExchange(){
    return new TopicExchange("TOPIC_EXCHANGE");
}

@Bean("fanoutExchange")
public FanoutExchange getFanoutExchange(){
    return  new FanoutExchange("FANOUT_EXCHANGE");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.定义队列

@Bean("firstQueue")
public Queue getFirstQueue(){
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl",6000); // 设置超时
    Queue queue = new Queue("FIRST_QUEUE", false, false, true, args);
    return queue;
}

@Bean("secondQueue")
public Queue getSecondQueue(){
    return new Queue("SECOND_QUEUE");
}

@Bean("thirdQueue")
public Queue getThirdQueue(){
    return new Queue("THIRD_QUEUE");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.定义绑定

@Bean
public Binding bindFirst(@Qualifier("firstQueue") Queue queue, // 队列
                          @Qualifier("directExchange") TopicExchange exchange){ // 交换机
    return BindingBuilder.bind(queue).to(exchange).with("my"); // 路由键
}

@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue, 
                          @Qualifier("topicExchange") TopicExchange exchange){ 
    return BindingBuilder.bind(queue).to(exchange).with("#.my.#"); // 路由键有通配符
}

@Bean
public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
                         @Qualifier("fanoutExchange") FanoutExchange exchange){
    return BindingBuilder.bind(queue).to(exchange); // fanout没有路由键
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.2 Producer

void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
  • 1
@Component
public class MyProvider {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(){
        // 发送4条消息        
        amqpTemplate.convertAndSend("DIRCET_EXCHANGE","my","a direct msg"); 

        amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "msg", "a topic msg1");
        amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "a.my.b", "a topic msg2");

        amqpTemplate.convertAndSend("FANOUT_EXCHANGE","","a fanout msg");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.3 Consumer

@Component
@RabbitListener(queues = "FIRST_QUEUE") // 指定监听的队列,
public class FirstConsumer {

    @RabbitHandler // 有消息时,就会自动调用当前方法
    public void process(String msg){
        System.out.println(" first queue received msg : " + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
@Component
@RabbitListener(queues = {"SECOND_QUEUE","THIRD_QUEUE"}) // 配置一个消费者监听多个队列
public class SecondConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println(" second queue received msg : " + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.4 配置文件

全部配置总体上分成三大类:连接类、消息发送类、消息消费类。

1.连接配置

# rabbitmq连接基本配置
spring.rabbitmq.addresses=43.105.136.120:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
属性值说明默认值
address客户端连接的地址,有多个的时候用逗号分隔
改地址可以IP与port的结合
hostRabbitMQ的主机地址localhost
portRabbitMQ的端口号
virtual-host连接到RabbitMQ的虚拟主机
username登录到RabbitMQ的用户名
password登录到RabbitMQ的密码
ssl.enabled启用SSL支持false
ssl.key-store保存SSL证书的地址
ssl.key-store-password访问SSL证书的地址使用的密码
ssl.trust-storeSSL的可信地址
ssl.trust-store-password访问SSL的可信地址的密码
ssl.algorithmSSL算法,默认使用Rabbit的客户端算法库
cache.channel.checkout-timeout当缓存已满时,获取Channel的等待时间,单位为毫秒
cache.channel.size缓存中保持的Channe丨数量
cache.connection.mode连接缓存的模式CHANNEL
cache.connection.size缓存的连接数
connnection-timeout连接超时参数单位为毫秒:设置为“0”代表无穷大
dynamic默认创建—个AmqpAdmin的Beantrue

注:Producer与Consumer都需要先配置RabbitMQ连接信息。

2.消息发送配置(Producer)

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
属性值说明默认值
publisher-confirms开启 Publisher Confirm 机制
publisher-returns开启 Publisher Return 机制
template.mandatory启用强制信息false
template.receive-timeoutreceive()方法的超时时间0
template.reply-timeoutsendAndReceive()方法的超时时间5000
template.retry.enabled设置为true的时候RabbitTemplate能够实现重试false
template.retry.initial-interval第一次与第二次发布消息的时间间隔1000
template.retry.max-attempts尝试发布消息的最大数量3
template.retry.max-interval尝试发布消息的最大时间间隔10000
template.retry.multiplier上一次尝试时间间隔的乘数1.0

3.消息消费配置(Consumer)

# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
# 设置当前消费者数量(线程数)
spring.rabbitmq.listener.simple.concurrency=5
# 设置消费者最大并发数量
spring.rabbitmq.listener.simple.max-concurrency=10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
属性值说明默认值
listener.simple.acknowledge-mode容器的acknowledge模式
listener.simple.auto-startup肩动时自动启动容器true
listener.simple.concurrency消费者的最小数量
listener.simple.default-requeue-rejected投递失败时是否重新排队true
listener.simple.max-concurrency消费者的最大数量
listener.simple.missing-queues-fatal容器上声明的队列不可用时是否失敗
listener.simple.prefetch在单个请求中处理的消息个数,他应该大于等于事务数量
listener.simple.retry.enabled不论是不是重试的发布false
listener.simple.retry.initial-interval第一次与第二次投递尝试的时间间隔1000ms
listener.simple.retry.max-attempts尝试投递消息的最大数量3
listener.simple.retry.max-interval两次尝试的最大时间间隔10000ms
listener.simple.retry.multiplier上一次尝试时间间隔的乘数1.0
listener.simple.retry.stateless重试是有状态的还是无状态的true
listener.simple.transaction-size在一个事务中处理的消息数量 = 为了获得最佳效果,
该值应设罝为小于等于每个请求中处理的消息个数,即 listener.prefetch 的值
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号