赞
踩
AMQP:Advanced Message Queuing Protocol,是用于在应用程序
或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合
微服务中独立性的要求。
SpringAMQP:Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
消息发送
在父工程中引入AMQP依赖依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写生产者的application.yml文件
spring:
rabbitmq:
host: 192.168.226.134
port: 5672
username: cytmq
password: 123321
virtual-host: /
编写测试代码
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void MessageQuerAmqp() {
String querName = "simple.queue";
String message = "hello AMQP";
rabbitTemplate.convertAndSend(querName,message);
}
消息接受:
编写在消费者的application.yml文件
spring:
rabbitmq:
host: 192.168.226.134
port: 5672
username: cytmq
password: 123321
virtual-host: /
在消费者服务中建立类,编写消费逻辑
/**
* @ClassName SpringAMQPListener
* @Description 添加描述
* @Author CYT
* @LastChangeDate 2023/3/22 20:16
* @Version v2.0.1
*/
@Component //管理Bean
public class SpringAMQPListener {
@RabbitListener(queues = "simple.queue") //监听simple.queue ,可以单个也可以多个
public void listenerSimpleQue(String msg){
System.out.println("消费者接受到simple.queue 的消息【"+msg+"】");
}
}
消息一旦被接受就会被销毁(阅后即焚)
实现一个队列绑定多个消费者
在消息发送者中定义每秒50条消息:
@Autowired
private RabbitTemplate rabbitTemplate;
public void MessageWorkQuerAmqp() throws InterruptedException {
String querName = "simple.queue";
String message = "hello AMQP--";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(querName,message+i);
Thread.sleep(20);
}
在消费者服务中取消息队列
@RabbitListener(queues = "simple.queue")
public void listenerWorkerQue1(String msg) throws InterruptedException {
System.out.println("消费者1接受到simple.queue 的消息【"+msg+"】"+ LocalTime.now());
Thread.sleep(20);
}
//LocalTime.now() 当前时间
@RabbitListener(queues = "simple.queue")
public void listenerWorkerQue2(String msg) throws InterruptedException {
System.err.println("消费者2....接受到simple.queue 的消息【"+msg+"】"+LocalTime.now());
Thread.sleep(200);
}
修改application文件
spring:
rabbitmq:
host: 192.168.226.134
port: 5672
username: cytmq
password: 123321
virtual-host: /
listener:
simple:
prefetch: 1 # 每次只能处理一条消息,处理完才能获取下一条消息
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
发布者将消息发给交换机,交换机再将消息转发到消息队列
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue。
在consumer)服务中,利用代码声明队列、交换机,并将两者绑定
/** * @ClassName FanoutConfig * @Description 添加描述 * @Author CYT * @LastChangeDate 2023/3/27 21:01 * @Version v2.0.1 */ @Configuration public class FanoutConfig { /** * @MethodName fanoutExchange * @Param null * @Author CYT * @Description 声明fanoutExchange交换机 * @Return FanoutExchange * @LastChangeDate 2023/3/27 * @Version v2.0.1 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("cyt.fanout"); } /** * @MethodName fanoutQueue * @Param null * @Author CYT * @Description 声明第一个队列 * @Return Queue * @LastChangeDate 2023/3/27 * @Version v2.0.1 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * @MethodName bindingQueue1 * @Param * @Author CYT * @Description 绑定队列1与交换机 * @Return * @LastChangeDate 2023/3/27 * @Version v2.0.1 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * @MethodName fanoutQueue * @Param null * @Author CYT * @Description 声明第二个队列 * @Return Queue * @LastChangeDate 2023/3/27 * @Version v2.0.1 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //绑定第二个队列 @Bean public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenerFanoutQue1(String msg) throws InterruptedException {
System.err.println("消费者接受到fanout.queue1 的消息【"+msg+"】"+LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")
public void listenerFanoutQue2(String msg) throws InterruptedException {
System.err.println("消费者接受到fanout.queue2 的消息【"+msg+"】"+LocalTime.now());
}
在publisher中编写测试方法,向itcast.fanout,发送消息
/** * @MethodName testSendFanoutExchange * @Param * @Author CYT * @Description 消息发送至交换机 * @Return * @LastChangeDate 2023/3/27 * @Version v2.0.1 */ @Test public void testSendFanoutExchange() { //交换机名称 String fanoutName = "cyt.fanout"; //消息 String message= "hello evero one"; //发消息 rabbitTemplate.convertAndSend(fanoutName,"",message); }
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
代码实现:
利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "cyt.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue.2
/** * @MethodName listenerDirectQueue2 * @Param msg 监听direct.queue2消息队列所获得的信息 * @Author CYT * @Description 路由交换机的声明、绑定、和监听 * @Return null * @LastChangeDate 2023/3/29 * @Version v2.0.1 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "cyt.direct",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenerDirectQueue2(String msg){ System.err.println("消费者接受到direct.queue2 的消息【"+msg+"】"+LocalTime.now()); }
在publisher中编写测试方法,向cyt.direct发送消息
/** * @MethodName testSendDirectRedExchange * @Param * @Author CYT * @Description 路由交换机的消息发送指定绑定key值为red的消息队列 * @Return * @LastChangeDate 2023/3/29 * @Version v2.0.1 */ @Test public void testSendDirectRedExchange() { //交换机名称 String fanoutName = "cyt.direct"; //消息 String message= "hello red"; //发消息 rabbitTemplate.convertAndSend(fanoutName,"red",message); }
与路由交换机相比,Topic BandingKEY支持通配符,和多项选择
消息队列的获取
/** * @MethodName listenerTopicQueue2 * @Param * @Author CYT * @Description Topic Exchange消息接受key中含有news的消息队列 * @Return * @LastChangeDate 2023/3/29 * @Version v2.0.1 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "cyt.topic",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenerTopicQueue2(String msg){ System.out.println("消费者接受到Topic.queue2 的消息【"+msg+"】"+LocalTime.now()); } /** * @MethodName listenerTopicQueue1 * @Param * @Author CYT * @Description Topic Exchange消息接受key值中含有china的所有消息 * @Return * @LastChangeDate 2023/3/29 * @Version v2.0.1 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "cyt.topic",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenerTopicQueue1(String msg){ System.out.println("消费者接受到Topic.queue1 的消息【"+msg+"】"+LocalTime.now()); }
消息的发送
/** * @MethodName testSendTopExchange * @Param * @Author CYT * @Description TOPic消息发送 routingKey可以有多个用【.】链接 * @Return * @LastChangeDate 2023/3/29 * @Version v2.0.1 */ @Test public void testSendTopExchange() { //交换机名称 String fanoutName = "cyt.topic"; //消息 String message= "cc"; //发消息 rabbitTemplate.convertAndSend(fanoutName,"china.cc",message); }
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型
的消息,SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter:来处理的。而
默认实现是SimpleMessageConverter,基于DK的ObjectOutputStream完成序列化。
解决办法
只需要定义一个MessageConverter类型的Bean即可。推荐用SON方式序列化,步骤如下:
在父工程中引入jackson-databind依赖
<!-- jackson-databind依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
在消息发送的启动类中声明Bean
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
在消费者服务中获取消息
/**
* @MethodName listenerObjectQueue
* @Param [Map<String,Object> msg],对象类型的msg
* @Author CYT
* @Description 接收对象类型的消息
* @Return void
* @LastChangeDate 2023/3/30
* @Version v2.0.1
*/
@RabbitListener(queues = "object,queue")
public void listenerObjectQueue(Map<String,Object> msg){
System.out.println("消费者接受到object,queue 的消息【"+msg+"】"+LocalTime.now());
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。