赞
踩
消息分为:
1、同步消息
2、异步消息
处理消息的角色分为:
1、消息发送方
2、消息接收方
企业级应用中广泛使用的三种异步消息传递技术:
1、JMS(Java Message Service):一种规范,类似于JDBC规范,提供了与消息服务相关的API接口。
2、AMQP(advanced message queuing protocol):一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式,兼容JMS。用于不同语言开发的系统之间参照此协议来通信。优点是:具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现。
3、MQTT(Message Queueing Telemetry Transport):消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一。
JMS消息模型:
1、peer-to-peer:点对点模型,消息发送到一个队列中,队列保存消息。队列的消息只能被一个消费者消费或超时。
2、publish-subscribe:发布订阅模型,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方的存在。
JMS消息种类:
1、TextMessage
2、MapMessage
3、ByteMessage(常用)
4、StreamMessage
5、ObjectMessage
6、Message(只有消息头和属性)
实现JMS规范的消息中间件:
1、ActiveMQ
2、Redis
3、HornetMQ
4、RabbitMQ
5、RocketMQ(没有完全遵守JMS规范)
AMQP消息模型:
1、direct exchange(常用)
2、fanout exchange
3、topic exchange
4、headers exchange
5、system exchange
AMQP消息种类:
byte[]:字节数组,统一了格式。解决了跨平台的问题。
实现AMQP协议的消息中间件:
1、RabbitMQ
2、StormMQ
3、RocketMQ
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
spring:
activemq:
# activemq 服务的连接地址
broker-url: tcp://localhost:61616
jms:
# 此属性设置为 true 时,使用的是JMS消息模型中的发布订阅消息模型
pub-sub-domain: true
// OrderService.java
public interface OrderService {
void order(String id);
}
// OrderServiceImpl.java import com.example.springboot.service.MessageService; import com.example.springboot.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class OrderServiceImpl implements OrderService { @Autowired private MessageService ms; @Override public void order(String id) { // 此处简单展示:省略中间过程各种服务的调用,处理各种业务i= System.out.println("订单处理开始..."); // 发送短信消息 ms.sendMessage(id); System.out.println("订单处理结束..."); System.out.println("------------------------"); } }
// MessageService.java
public interface MessageService {
void sendMessage(String id);
String receiveMessage();
}
// MessageServiceActiveMQImpl.java import com.example.springboot.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; @Service public class MessageServiceActiveMQImpl implements MessageService { @Autowired private JmsMessagingTemplate messagingTemplate; @Override public void sendMessage(String id) { System.out.println("待发送的短信已进入消息队列,id:" + id); messagingTemplate.convertAndSend("activemq.order.queue.id", id); } @Override public String receiveMessage() { String id = messagingTemplate.receiveAndConvert("activemq.order.queue.id", String.class); System.out.println("已完成短信发送业务,id:" + id); return id; } }
import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @Component public class MessageListener { // 自动执行中间件中的消息消费,当监听到中间件中有消息时,自动消费 // 参数 id 的类型为放到消息中的消息类型 @JmsListener(destination = "activemq.order.queue.id") // @SendTo 注解将 ”activemq.order.queue.id“ 这个队列中的消息处理完后, // 将receive方法的返回值放到物流(logistics)队列中,以此来实现消息的自动流转。 // 然后可以再定义物流队列的监听器,看实际需求 @SendTo("activemq.logistics.queue.id") public String receive(String id){ System.out.println("已完成短信发送业务,id:" + id); return id; } }
订单处理开始...
待发送的短信已进入消息队列,id:1
已完成短信发送业务,id:1
订单处理结束...
------------------------
订单处理开始...
待发送的短信已进入消息队列,id:2
已完成短信发送业务,id:2
订单处理结束...
------------------------
订单处理开始...
待发送的短信已进入消息队列,id:3
已完成短信发送业务,id:3
订单处理结束...
------------------------
RabbitMQ 基于 Erlang 语言编写,需要安装 Erlang。
下载地址:
链接
安装按照默认配置安装即可,Erlang安装完成后,配置环境变量:
RabbitMQ下载地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.7/rabbitmq-server-3.10.7.exe
两个安装完后重启电脑。
// RabbitMQConfigDirect.java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfigDirect { @Bean // 定义队列 public Queue directQueue(){ // 第二个参数:消息队列是否持久化 // 第三个参数:当前消息队列是否当前连接专用 // 第四个参数:当生产者消费者都不再使用此队列,是否删除此队列 return new Queue("direct_queue", true, false, false); } @Bean // 定义交换机,一个交换机可以绑定多个队列 public DirectExchange directExchange(){ return new DirectExchange("direct_exchange"); } @Bean // 定义路由器 public Binding directBinding(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct"); } }
// MessageServiceRabbitMQDirectImpl.java import com.example.springboot.service.MessageService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageServiceRabbitMQDirectImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendMessage(String id) { amqpTemplate.convertAndSend("direct_exchange", "direct", id); System.out.println("(Rabbitmq direct)待发送的短信已进入消息队列,id:" + id); } @Override public String receiveMessage() { return null; } }
// MessageRabbitMQDirectListener.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageRabbitMQDirectListener {
@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("(Rabbitmq direct)已完成短信发送业务,id:" + id);
}
}
// MessageRabbitMQDirectListener2.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageRabbitMQDirectListener2 {
@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("(Rabbitmq direct 2)已完成短信发送业务,id:" + id);
}
}
订单处理开始... (Rabbitmq direct)待发送的短信已进入消息队列,id:1 订单处理结束... ------------------------ (Rabbitmq direct)已完成短信发送业务,id:1 订单处理开始... (Rabbitmq direct)待发送的短信已进入消息队列,id:2 订单处理结束... ------------------------ (Rabbitmq direct 2)已完成短信发送业务,id:2 订单处理开始... (Rabbitmq direct)待发送的短信已进入消息队列,id:3 订单处理结束... ------------------------ (Rabbitmq direct)已完成短信发送业务,id:3 订单处理开始... (Rabbitmq direct)待发送的短信已进入消息队列,id:4 订单处理结束... ------------------------ (Rabbitmq direct 2)已完成短信发送业务,id:4 订单处理开始... (Rabbitmq direct)待发送的短信已进入消息队列,id:5 订单处理结束... ------------------------ (Rabbitmq direct)已完成短信发送业务,id:5
// RabbitMQConfigTopic.java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfigTopic { @Bean // 定义队列 public Queue topicQueue(){ // 第二个参数:消息队列是否持久化 // 第三个参数:当前消息队列是否当前连接专用 // 第四个参数:当生产者消费者都不再使用此队列,是否删除此队列 return new Queue("topic_queue", true, false, false); } @Bean // 定义队列 public Queue topicQueue2(){ // 第二个参数:消息队列是否持久化 // 第三个参数:当前消息队列是否当前连接专用 // 第四个参数:当生产者消费者都不再使用此队列,是否删除此队列 return new Queue("topic_queue2", true, false, false); } @Bean // 定义交换机,一个交换机可以绑定多个队列 public TopicExchange topicExchange(){ return new TopicExchange("topic_exchange"); } @Bean // 定义路由器,路由器可以定义规则 // * 用来代表一个单词,且该单词是必须出现的 public Binding TopicBinding(){ return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id"); } @Bean // 定义路由器,路由器可以定义规则 // # 用来表示任意数量的单词,可以为0个单词 public Binding TopicBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); } }
// MessageServiceRabbitMQTopicImpl.java import com.example.springboot.service.MessageService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageServiceRabbitMQTopicImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendMessage(String id) { // 第一个参数是交换机 // 第二个参数是routingKey,如果其可以匹配上配置类中绑定着topic_exchange交换机的n个路由器的话, // 那么这个消息就会被分发到n和队列中,被消费n次 // 第三个参数是,发送到消息队列的消息对象 amqpTemplate.convertAndSend("topic_exchange", "topic.order.id", id); System.out.println("(Rabbitmq topic)待发送的短信已进入消息队列,id:" + id); } @Override public String receiveMessage() { return null; } }
// MessageRabbitMQTopicListener.java import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Configuration; @Configuration public class MessageRabbitMQTopicListener { @RabbitListener(queues = "topic_queue") public void receive(String id){ System.out.println("(Rabbitmq topic)已完成短信发送业务,id:" + id); } @RabbitListener(queues = "topic_queue2") public void receive2(String id){ System.out.println("(Rabbitmq topic 2)已完成短信发送业务,id:" + id); } }
订单处理开始...
(Rabbitmq topic)待发送的短信已进入消息队列,id:1
订单处理结束...
------------------------
(Rabbitmq topic 2)已完成短信发送业务,id:1
(Rabbitmq topic)已完成短信发送业务,id:1
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
rocketmq:
name-server: localhost:9876
producer:
group: group_rocketmq
// MessageServiceRocketMQImpl.java import com.example.springboot.service.MessageService; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageServiceRocketMQImpl implements MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public void sendMessage(String id) { // convertAndSend是同步方法,在实际生产过程中不建议使用 // rocketMQTemplate.convertAndSend("rocketmq_order_queue_id", id); // 实际生产过程中使用异步方法: SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("通过异步消息方式,消息发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("通过异步消息方式,消息发送失败"); } }; rocketMQTemplate.asyncSend("rocketmq_order_queue_id", id, callback); System.out.println("(Rocketmq)待发送的短信已进入消息队列,id:" + id); } @Override public String receiveMessage() { return null; } }
// MessageServiceRocketMQListener.java
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "rocketmq_order_queue_id", consumerGroup = "group_rocketmq")
public class MessageServiceRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("(Rocketmq)已完成短信发送业务,id:" + message);
}
}
订单处理开始...
(Rocketmq)待发送的短信已进入消息队列,id:1
订单处理结束...
------------------------
通过异步消息方式,消息发送成功
(Rocketmq)已完成短信发送业务,id:1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。