赞
踩
Spring有很多不同的项目,其中就有对AMQP的支持:
Spring AMQP的页面 https://spring.io/projects/spring-amqp
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
链接:https://pan.baidu.com/s/1PT_d64NuUSQhzNPpx_WDMw?pwd=4mur
提取码:4mur
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
@Configuration
public class RabbitmqConfig {
//创建基本队列
@Bean
public Queue basicQueue(){
//name队列名称;durable 是否持久化 false不持久化; exclusive是否独有的 ;autoDelete 是否自动删除 ;argument 参数
//return new Queue("common-queue"); 上面也就是默认的 默认 持久化,非独占,不是自动删除;
return new Queue("hello",true,false,false,null);
}
@RestController public class BasicProducer { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/basicProducer") public void producer(){ System.out.println("发送基本模型消息"); rabbitTemplate.convertAndSend("hello","基本模型消息"); /** * convertAndSend比较常用参数: * exchange 指定交换机、 * RoutingKey 路由key、 * object 消息体、 */ } }
@Component
public class BasicConsumer {
@RabbitListener(queues = "hello")
public void listener(String msg){
System.out.println("监听到的消息==="+msg);
}
}
相较于基本消息模型,word消息模型 可以有2个以上消费者共同消费队列中的消息
与基本消息模型一样
与基本消息模型一样
写2个以上消费者
消费者共同消费队列中的消息
默认公平轮询;
在yml中添加以下配置。
listener:
simple:
# 公平分发
prefetch: 1
将消息交给所有绑定到交换机的队列
创建一个交换机多个队列
@Configuration public class RabbitmqConfig { //创建fanout交换机 @Bean public FanoutExchange fanoutExchange() { // System.out.println("交换机"); return new FanoutExchange("fanout-exchange"); } //创建fanout队列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout-queue1",true,false,false,null); } //创建fanout队列 @Bean public Queue fanoutQueue2(){ return new Queue("fanout-queue2",true,false,false,null); } //把队列绑定到交换机 @Bean public Binding fanoutQueueTofanoutExchange(){ return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutQueueTofanoutExchange2(){ return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); }
@RestController public class FanoutProducer { @Autowired RabbitTemplate rabbitTemplate; /** * fanout广播 */ @RequestMapping("/fanoutProducer") public void fanoutProducer(){ for(int i=0;i<10;i++){ rabbitTemplate.convertAndSend("fanout-exchange","","广播消息"); } System.out.println("发送成功"); } }
@Component public class FanoutConsumer { /** * fanout 消息模型 */ @RabbitListener(queues = "fanout-queue1") public void listenerFanout1(String msg) { System.out.println("fanout接受消息1" + msg); } /** * fanout 消息模型 */ @RabbitListener(queues = "fanout-queue2") public void listenerFanout2(String msg) { System.out.println("fanout接受消息2" + msg); } }
把消息交给符合指定routing key 的队列
创建一个交换机多个队列
@Configuration public class RabbitmqConfig { //创建direct交换机 @Bean public DirectExchange directExchange() { // System.out.println("交换机"); return new DirectExchange("direct-exchange"); } //创建direct队列 @Bean public Queue directQueue1(){ return new Queue("direct-queue1",true,false,false,null); } @Bean public Queue directQueue2(){ return new Queue("direct-queue2",true,false,false,null); } //把direct队列绑定到交换机 @Bean public Binding directQueueTodirectExchange1(){ return BindingBuilder.bind(directQueue1()).to(directExchange()).with("error"); } @Bean public Binding directQueueTodirectExchange2(){ return BindingBuilder.bind(directQueue2()).to(directExchange()).with("info"); }
根据不同的routingkey 将消息发送不同的队列中
@RestController
public class DirectProducer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* direct定向
* */
@RequestMapping("/directProducer")
public void directProducer(){
rabbitTemplate.convertAndSend("direct-exchange","info","定向消息error");
System.out.println("发送成功");
}
}
@Component public class DirectConsumer { /** * direct 消息模型 */ @RabbitListener(queues = "direct-queue1") public void listenerDirect1(String msg){ System.out.println("direct接受消息1" + msg); } /** * direct 消息模型 */ @RabbitListener(queues = "direct-queue2") public void listenerDirect2(String msg){ System.out.println("direct接受消息2" + msg); } }
把消息交给符合routing pattern(路由模式) 的队列
创建一个交换机多个队列
@Configuration public class RabbitmqConfig { //创建Topic交换机 @Bean public TopicExchange topicExchange() { // System.out.println("交换机"); return new TopicExchange("topic-exchange"); } //创建topic队列 @Bean public Queue topicQueueInfo(){ return new Queue("topic-queue-info",true,false,false,null); //return new Queue("common-queue"); 上面也就是默认的 } //创建topic队列 @Bean public Queue topicQueueError(){ return new Queue("topic-queue-error",true,false,false,null); //return new Queue("common-queue"); 上面也就是默认的 } //把topic队列绑定到交换机 @Bean public Binding InfoQueueToTopicExchange(){ return BindingBuilder.bind(topicQueueInfo()).to(topicExchange()).with("info.*"); } @Bean public Binding ErrorQueueToTopicExchange(){ return BindingBuilder.bind(topicQueueError()).to(topicExchange()).with("common.*"); }
根据不同的routingkey 将消息发送不同的队列中
@RestController
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public void send(String msg){
System.out.println(msg);
rabbitTemplate.convertAndSend("topic-exchange","info.one",msg);
}
@Component
public class TopicConsumer {
@RabbitListener(queues="topic-queue-info")
public void listenerInfo(String msg, Message message, Channel channel) throws IOException {
System.out.println("进入消费者info"+msg);
}
@RabbitListener(queues="topic-queue-error")
public void listenerError(String msg, Message message, Channel channel) throws IOException {
System.out.println("进入消费者Error"+msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。