赞
踩
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.0</version>
<relativePath />
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
virtualHost: /
profiles:
active: dev
server:
port: 8080
@SpringBootApplication
public class BlogCodeApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(BlogCodeApplication.class, args);
}
}
@Component
public class RabbitSender {
private RabbitTemplate rabbitTemplate;
public RabbitSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 不显式声明交换机时并且发送消息不指定交换机,则默认使用Direct,
* 并且声明队列时,不显式绑定队列与交换机,则队列以队列名为routing-key
* 绑定到默认的direct交换机,
* 发送消息不指定交换机时,则将消息发到默认的direct交换机
*/
public void send() {
String msg = "Hello World!";
this.rabbitTemplate.convertAndSend("hello", msg);
System.out.println("发送消息" + msg + "至队列hello");
}
}
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitSender rabbitSender;
@Test
public void testSender() {
rabbitSender.send();
}
}
@Component
@RabbitListener(queues = "hello")
public class RabbitReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println("Received '" + in + "'");
}
}
我们把一个消息转发给多个消费者,这种模式称之为发布-订阅模式
RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。
实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列
AnonymousQueue是非持久化、专有的、自动删除的、名字随机生成的队列
交换器与队列之间的关系称之为绑定关系
@Configuration
public class MqConfig {
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("fanout");
}
private static class ConsumerConfig {
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
}
}
}
@Component
public class FanoutSender {
private RabbitTemplate rabbitTemplate;
public FanoutSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send() {
String msg = "Hello World!";
// fanout为交换器名称
rabbitTemplate.convertAndSend("fanout", "", msg);
}
}
@Component
public class FanoutReceiver {
// autoDeleteQueue1.name为匿名队列的名称
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) {
System.out.println("临时队列1接收到消息:" + in);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) {
System.out.println("临时队列2接收到消息:" + in);
}
}
@SpringBootTest
public class RabbitTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void testFanoutSender() {
fanoutSender.send();
}
}
headers exchange是根据消息header值而不是routing key将消息路由到队列的交换器。
生产者在消息头中以键值对的形式添加一些值,并将其发送到headers exchange,
收到消息后,headers exchange尝试将所有或任何(基于x-match的值)header值与绑定到它的所有队列的值匹配。 如果找到匹配,则将消息路由到绑定值匹配的队列,如果未找到匹配,则忽略该消息
@Configuration
public class HeaderExchangeConfig {
@Bean
public HeadersExchange headers() {
return new HeadersExchange("headers");
}
private static class ConsumerConfig {
@Bean
public Queue headersAutoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue headersAutoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Queue headersAutoDeleteQueue3() {
return new AnonymousQueue();
}
@Bean
public Binding headersBinding1(HeadersExchange headers, Queue headersAutoDeleteQueue1) {
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h2", "Header2");
/**
* 匹配到任意一个就发送至队列
*/
return BindingBuilder.bind(headersAutoDeleteQueue1).to(headers).whereAny(headerMap).match();
}
@Bean
public Binding headersBinding2(HeadersExchange headers, Queue headersAutoDeleteQueue2) {
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h2", "Header2");
/**
* 全部匹配到才会发送至队列
*/
return BindingBuilder.bind(headersAutoDeleteQueue2).to(headers).whereAll(headerMap).match();
}
@Bean
public Binding headersBinding3(HeadersExchange headers, Queue headersAutoDeleteQueue3) {
/**
* 匹配到任意一个就发送至队列, 此处与headersBinding1一致,为了证明其也有fanout模式的功能
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h2", "Header2");
return BindingBuilder.bind(headersAutoDeleteQueue3).to(headers).whereAny(headerMap).match();
}
}
}
@Component
public class HeaderSender {
private RabbitMessagingTemplate rabbitMessagingTemplate;
public HeaderSender(RabbitMessagingTemplate rabbitMessagingTemplate) {
this.rabbitMessagingTemplate = rabbitMessagingTemplate;
}
public void send() {
String msg = "Hello World!";
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h3", "Header3");
rabbitMessagingTemplate.convertAndSend("headers", "", msg, headerMap);
msg = "My Girl!";
headerMap.clear();
headerMap.put("h1", "Header1");
headerMap.put("h2", "Header2");
rabbitMessagingTemplate.convertAndSend("headers", "", msg, headerMap);
}
}
@Component
public class HeaderReceiver {
@RabbitListener(queues = "#{headersAutoDeleteQueue1.name}")
public void receive1(String in) {
System.out.println("临时队列1接收到消息:" + in);
}
@RabbitListener(queues = "#{headersAutoDeleteQueue2.name}")
public void receive2(String in) {
System.out.println("临时队列2接收到消息:" + in);
}
@RabbitListener(queues = "#{headersAutoDeleteQueue3.name}")
public void receive3(String in) {
System.out.println("临时队列3接收到消息:" + in);
}
}
@SpringBootTest
public class RabbitTest {
@Autowired
private HeaderSender headerSender;
@Test
public void testHeaderSender() {
headerSender.send();
}
}
从结果中不难看出,My Girl消息3个队列都匹配到headers, 而Hello Word消息只有队列1与3匹配到,毕竟他没有h2头
我们通过fanout exchange(扇型交换机)实现生产者发送一个消息,这个消息同时被传送给所有队列。但是有时我们不希望所有的消息都被所有队列接收,我们希望可以指定类型为a的消息只能被队列A接收,类型为b的消息只能被队列B,C接收。扇型交换机只能无脑地广播消息给所有的消费者,其实质是广播给所有关联的队列
为了实现这个功能,一种是建立多个交换机,这种方式简单暴力但是不灵活。本节我们介绍使用单个直连交换机+路由实现以上功能
在上图中,有2个队列绑定到直连交换机上。队列Q1使用绑定值为orange,队列Q2绑定值为black,green。在这种情况下,如果生产者发送的消息的路由值为orange,则此消息会被路由到队列Q1。如果生产者发送的消息的路由值为blcak,green,则此消息会被路由到队列Q2。其它的消息会被丢弃
我们也可以将相同的绑定值绑定到不同的队列中。如上图中,队列Q1和Q2使用的绑定值都black。如果生产者发送的消息的路由值为black,则此消息会被同时路由到队列Q1和队列Q2
@Configuration
public class DirectExchangeConfig {
@Bean
public DirectExchange direct() {
return new DirectExchange("direct");
}
private static class ConsumerConfig {
@Bean
public Queue directAutoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue directAutoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding directBinding1(DirectExchange direct, Queue directAutoDeleteQueue1) {
return BindingBuilder.bind(directAutoDeleteQueue1).to(direct).with("orange");
}
@Bean
public Binding directBinding2(DirectExchange direct, Queue directAutoDeleteQueue2) {
return BindingBuilder.bind(directAutoDeleteQueue2).to(direct).with("black");
}
@Bean
public Binding directBinding3(DirectExchange direct, Queue directAutoDeleteQueue2) {
return BindingBuilder.bind(directAutoDeleteQueue2).to(direct).with("green");
}
}
}
@Component
public class DirectSender {
private RabbitTemplate rabbitTemplate;
public DirectSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send() {
rabbitTemplate.convertAndSend("direct", "orange", "orange msg");
rabbitTemplate.convertAndSend("direct", "green", "green msg");
rabbitTemplate.convertAndSend("direct", "black", "black msg");
}
}
@Component
public class DirectReceiver {
@RabbitListener(queues = "#{directAutoDeleteQueue1.name}")
public void receive1(String in) {
System.out.println("临时队列1接收到消息:" + in);
}
@RabbitListener(queues = "#{directAutoDeleteQueue2.name}")
public void receive2(String in) {
System.out.println("临时队列2接收到消息:" + in);
}
}
@SpringBootTest
public class RabbitTest {
@Autowired
private DirectSender directSender;
@Test
public void testDirectSender() {
directSender.send();
}
}
可以看到绑定路由键black与green的队列2接收到消息
topic主题交换器它根据在队列绑定的路由键和路由模式通配符匹配将消息路由到队列。
生产者在消息头中添加路由键并将其发送到主题交换器。 收到消息后,exchange尝试将路由键与绑定到它的所有队列的绑定路由模式匹配。 如果找到匹配,则将消息路由到其路由模式匹配的队列,如果未找到匹配,则忽略该消息
Routing key: 它是单词列表,由句点 (.) 分隔,例如“asia.china.beijing”
Routing Pattern: 它是在绑定队列时指定的模式,它是单词和通配符的列表,如*和#,由句点 (.) 分隔。 通配符的使用如下:
*: 用于匹配路由键中特定位置的单词,例如“asia.china.*”的路由模式将匹配第一个单词是asia而第二个单词的路由键单词是china,例如asia.china.beijing和asia.china.nanjing。
# :- 用于匹配零个或多个单词,例如asia.china.#的路由模式将匹配以asia.china开头的路由键,例如“asia.china”和 “亚洲.中国.北京”。
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topic() {
return new TopicExchange("topic");
}
private static class ConsumerConfig {
@Bean
public Queue topicAutoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue topicAutoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding topicBinding1(TopicExchange topic, Queue topicAutoDeleteQueue1) {
return BindingBuilder.bind(topicAutoDeleteQueue1).to(topic).with("asia.china.*");
}
@Bean
public Binding topicBinding2(TopicExchange topic, Queue topicAutoDeleteQueue2) {
return BindingBuilder.bind(topicAutoDeleteQueue2).to(topic).with("asia.china.#");
}
}
}
@Component
public class TopicSender {
private RabbitTemplate rabbitTemplate;
public TopicSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send() {
rabbitTemplate.convertAndSend("topic", "asia.china.beijing", "asia.china.beijing");
rabbitTemplate.convertAndSend("topic", "asia.china.nanjing", "asia.china.nanjing");
rabbitTemplate.convertAndSend("topic", "asia.china", "asia.china");
}
}
@Component
public class TopicReceiver {
@RabbitListener(queues = "#{topicAutoDeleteQueue1.name}")
public void receive1(String in) {
System.out.println("临时队列1接收到消息:" + in);
}
@RabbitListener(queues = "#{topicAutoDeleteQueue2.name}")
public void receive2(String in) {
System.out.println("临时队列2接收到消息:" + in);
}
}
@SpringBootTest
public class RabbitTest {
@Autowired
private TopicSender topicSender;
@Test
public void testTopicSender() {
topicSender.send();
}
}
可以看到对于#模式只有队列2接收到asia.china消息
欢迎关注公众号算法小生或沈健的技术博客shenjian.online
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。