赞
踩
优点:老牌,稳定性高,成熟度高,集群架构模式好(Zookeeper),对性能要求不高的可以使用,它只需要引入依赖即可,SpringBoot自身集成了。
缺点:性能低,延迟高;
只要有足够大的内存,就能承担很大的数据传输;
RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,它是纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,它对消息的可靠传输以及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
集群拓扑:(替换了Zookeeper,因为它性能低,换成了Name Server)
它的分布式事务等很多功能是需要商业版才能有的,需要收费。它高性能,可靠性,支持水平扩展,它有一个最大的问题:商业收费。
能保障数据不丢失,可做高可用,负载均衡,性能也很高,建议使用。
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的;
很多大厂如滴滴、美团、头条等,因为:
RabbitMQ高性能的原因?
什么是AMQP高级消息 队列协议?
AMQP协议模型:
Publisher 推送消息前先与Server建立连接,找到Virtual host,然后将消息推送至Exchange交换机。而交换机与Message Queue有绑定关系(一个交换机相当于一个独立的虚拟机,而这个虚拟机内的各种独立的应用就相当于一个Queue,这个Queue与交换机绑定),Consumer通过绑定的对队列,而交换机也绑定了队列。发送者将消息发送给交换机,这样就能完成消息的推送了。
生产者将消息发送至交换机,交换机将消息发送至指定的队列,而消费者则通过绑定的队列拿到此消息。
下载的版本不一定找最新的版本,要找最稳定的(版本的升级会伴随一定的风险)。同时要照顾到整体架构,比如其他的是否支持最新版本等,在升级上也要注意是否值得升级。
rpm一键安装是最简单的安装方式,初学者可以使用此方式
erlang要先安装;
rmp -ivh erlang-18.3-1.e17.centos.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
rpm -ivh socat-1.7.3.2-5.e17.lux.x86.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
在配置文件中即可配置端口号,内存等操作。后面细讲,这里先做到能使用即可。
yum install
build-essential openssl openssl-devel unixODBC unixODBC-devel
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.e17.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.e17.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users中的<<“guest”>>,只保留guest
rabbitmq-server start &
rabbitmqctl app_stop
rabbitmq-plugins enable rabbitmq_management
集群配置失败,故障转移等情况下可以将启动失败的节点给移除掉。它可以在不启动的情况下对节点的摘除
命令行的操作能做的,可视化界面也可以做的。
右上角:
可视化界面:
Type类型:
在Admin中可以配置账户以及账户的权限(如操作虚拟主机的权限)。(右边有选择框)
导入导出迁移备份:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
public class Consumer{
public static void main(String[] args) throws Exception{
// 1. 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2. 通过连接工厂创建连接
Connection connection =connectionFactory.newConnection();
//3. 通过connection创建一个Channel
Channel channel=connection.craeteChannel();
//4. 通过Channel发送数据
for(int i=0;i<5;i++){
String msg="Hello RabbitMQ!";
channel.basicPublish("","test001",null,msg.getBytes());
}
//5. 记得要关闭相关的连接
channel.close();
connection.close();
}
}
public class Consumer{
//1. 创建一个ConnectionFactory,并进行配置
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2. 通过连接工厂创建连接
Connection connection=connectionFactory.newConnection();
//3. 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4. 声明(创建)一个队列
String queueName="test001";
channel.queueDeclare("test001",true,false,false,null);
//5. 创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//6. 设置Channel
channel.basicConsume(queueName,true,queueingConsumer);
//7. 获取消息
while(true){
Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBod());
//Evelope envelope=delivery.getEnvelope();
System.out.println("消费端:"+msg);
}}}
我们在发送消息的过程中必须指定一个Exchange,如果指定的Exchange为空的话,它会使用默认的Exchange;
注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
符合条件的,被匹配到的都可以被接收到消息。
还有一些其他的但不是很常用,这里不赘述了;
服务器和应用程序之间传送的数据;
本质上就是一段数据,由Properties和Payload(Body)组成
常用属性:delivery mode、headers(自定义属性)
其他属性:
生产者自定义Header代码示例:
public class Procuder{
public static void main(String[] args) throws Exception{
// 1. 创建一个ConnectionFactory ,并进行配置
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2. 通过连接工厂连接
Connection connection=connectionFactory.newConnection();
//3. 通过connection创建一个Channel
Channel channel=connection.createChannel();
Map<String,Object> headers=new HashMap<>();
headers.put("my1","111");
headers.put("my2","222");
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(headers)
.build();
//4. 通过Channel发送数据
channel.basicPublish("","test001",null,msg.getBytes());
}
消费者自定义获取header里信息代码示例:
public class Consumer{
//1. 创建一个ConnectionFactory,并进行配置
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2. 通过连接工厂创建连接
Connection connection=connectionFactory.newConnection();
//3. 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4. 声明(创建)一个队列
String queueName="test001";
channel.queueDeclare("test001",true,false,false,null);
//5. 创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//6. 设置Channel
channel.basicConsume(queueName,true,queueingConsumer);
//7. 获取消息
while(true){
Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBod());
//Evelope envelope=delivery.getEnvelope();
System.out.println("消费端:"+msg);
}}}
本章小结: RabbitMQ的概念、安装与使用、管控台操作,结合RabbitMQ的特性、Exchange、Queue、Binding、RoutingKey、Message进行核心API的讲解,通过本章的学习,希望大家对RabbitMQ有一个初步的认知!
什么是生产端的可靠性投递?
生产端:
这里的打标,我们可以在消息将要发出的时候,将发出消息的状态修改,当确认收到了消息之后再修改状态。做一个定期轮询检查是否漏发,如果有则重新发送。
简单来说,就是先将要发送消息的订单入库,然后再发送消息,如果消息未发送成功则进行补偿重发(延迟检查如五分钟后),最好是不做事务(影响性能),少入DB。
幂等性是什么?
消费端-幂等性保障
在业务逻辑中,如果使用了数据库和Redis,在进行数据的流转中,Redis和数据库的事务不是一样的,要考虑到如何使事务一致性,同时成功同时失败等问题。
public class Producer{
public static void main(String[] args){
//1. 创建ConnectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.11.76")
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 获取Connection
Connection connection=ConnectionFactory.newConnection();
// 3. 通过Connection创建一个新的Channel
Channel channel=connection.createChannel();
// 4. 指定我们的消息投递模式:消息的确认模式
channel.confirmSelect();
String exchangeName="test_confirm_exchange";
String routingKey="confirm.save";
// 5. 发送一条消息
String msg="Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
// 6. 添加一个确认监听
channel.addConfirmListener(new ConfirmListener(){
@Override
public void handleNack(long deliveryTag,boolean multiple) throws IOException{
System.err.println("-------no ack!-------");
}
@Override
public void handleAck(long deliveryTag,boolean multiple) throws IOException{
System.err.println("---------ack!----------");
}
})
} }
public class Producer{
public static void main(String[] args)throws Exception{
// 1. 创建ConnectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2. 获取connection
Connection connection =connectionFactory.newConnection();
//3. 通过Conneciton创建一个新的Channle
String exchangeName="test_confirm_exchange";
String routingKey="confirm.#";
String queueName="test_confirm_queue";
// 4. 声明交换机和队列 然后进行绑定设置,最后制定路由key
channel.exchangeDeclare(exchangeName,"topic",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
// 5.创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
channel.basicConsume(queueName,true,queueingConsumer);
while(true){
Delivery delivery=queueingConsumer.nextDelivery();
String msg=new String(delivery.getBody());
System.err.println("消费端"+msg);
}}
}
Return Lis tener 用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener!
在基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
默认为false,当我们使用Return 消息机制的时候,我们需要将它设置为true;
继承DefaultConsumer的此类被写出后,需要进行绑定。(在交换机绑定时绑定自定义的Consumer);
TTL:
图示:
这里可以配置队列的相关参数配置;
交换机与交换机之间也可以进行绑定
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Configuration
@ComponentScan("com.bfxy.spring.*") //让此类被扫描到
public class RabbitMQConfig{
@Bean
public ConnectionFactory connectionFactory(){ //如果bean没有给name,就默认为方法的名称
CachingConnectionFactory connectionFactory=new CachingConnectionFactory();
connectionFactory.setAddress("192.168.11.76:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{
@Test
public void contextLoads(){}
}
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() throws Exception{
rabbitAdmin.declareExchange(new DirectExchange("test.direct",false,false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic",false,false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout",false,false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue",false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue",false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue",false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue",Binding.DestinationType.QUEUE,"test.direct","direct",new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue",false)); //直接创建队列
.to(new TopicExchange("test.topic",false,false)) //直接创建交换机,建立关联关系
.with("user.#"));
rabbitAdmin.declarebinding(
BindingBuilder
.bind(new Queue("test.fanout.queue",false))
.to(new FanoutExchange("test.fanout",false,false));
)
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue",false);
}
常见的队列:1. FanoutExchange:将消息分发到所有的绑定队列,无RoutingKey的概念 2.HeadersExchange:通过添加属性key-value匹配 3. DirectExchange:按照routingKey分发到指定队列 4. TopicExchange:多关键词匹配
@Bean
public TopicExchange exchange(){
return new TopicExchange("topic001",true,false);
}
@Bean
public Queue queue(){
return new Queue("queue001",true); //队列持久
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("spring.*"); //这里的with内容为routKey,可自定义
}
RabbitMQConfig.java 核心代码如下:
@Configuration
@ComponentScan({com.bfxy.spring.*}) //这里的路径换成自己的路径
public class RabbitMQConfig{
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory=new CachingConnectionFactory();
connectionFactory.setAddress("192.168.11.76:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public TopicExchange exchange(){
return new TopicExchange("topic001",true,false);
}
@Bean
public Queue queue(){
return new Queue("queue001",true); //队列持久
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("spring.*"); //这里的with内容为routKey,可自定义
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate =new RabbitTemplate(connectionFactory);
return rabbitTemplate; //在这里的上面可以对RabbitTemplate设置一些属性,最后再返回;
}
}
测试类中测试
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception{
//1. 创建消息
MessageProperties messageProperties =new MessageProperties();
messageProperties.getHeaders().put("desc","信息描述...");
messageProperties.getHeaders().put("type","自定义消息类型..");
Message message=new Message("Hello RabbitMQ".getBytes(),messageProperteis);
rabbitTemplate.converAndSend("topic001","spring.amqp",message,new MessagePostProcessor(){
@Overide
public Message postProcessMessage(Message message) throws AmqpException{
System.err.println("----------添加额外的配置-------");
message.getMessageProperties().getHeaders().put("desc","额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr","额外新加的属性"); //新加的属性不一定设置在Header,我们也可以设置在别的地方
return message;
}
});
}
// 简单版 最后面的方法是可以不要的
@Test
public void testSendMessage2() throws Exception{
//创建消息
MessageProperties messageProperties=new MessageProperties();
messageProperteis.setContentType("text/plain"); //文本类型
Message message=new Message("mq 消息".getBytes(),messageproperties);
rabbitTemplate.converAndSend("topic001","spring.amqp",message);
}
// 最简单版(消息体也可以不要,可直接传文本内容)
@Test
public void testSendMessage3() throws Exception{
rabbitTemplate.converAndSend(“topic001”,“spring.amqp”,“我是一段消息内容”);
}
}
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大;
思考一下:SimpleMessageListenerContainer为什么可以动态感知配置变更?
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectioNFactory connectionFactory){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(),queue002()); //包含的队列
container.setConcurrentConsumers(1); //当前消费者数量
container.setMaxConcurrentConsumers(5); //最大数量
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgMode.AUTO);
container.setConsumerTagStrategy(new ConsumerTagStrategy(){
@Overide
public String createConsumerTag(String queue){
return queue+"_"+UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message,Channel channel) throws Exception{
String msg=new String(message.getBody());
System.err.println("----消费者:---"+msg);
}
})
}
我们在SimpleMessageListenerContainer 中给队列名设置了队列+UUID的形式,于是在可视化界面中就看到了;
比如我们上面使用SimpleMessageListenerContainer 定义了同一个队列的不同消费者(队列名+UUID),我们可以使这每个消费者分别执行不同的方法,或者进行负载均衡等操作,可以使用此消息监听器实现;具体可百度;
使用转换器的目的是当传入不同的类型的数据(如json,类,PDF,图片等)时,在消息的接收方接收到时也总是以传入的类型接收结果对象;我们通过写入不同的转换器以达到此种效果。具体可百度。
注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效;生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。
配置application.properties:
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
创建RabbitMQConfig.java(此步骤省略,参照以前代码)
创建生产者RabbitSender:
@Component
public class RabbitSender{
@Autowired
private RabbitTemplate rabbitTemplate;
// ack确认
final ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData,boolean ack,String cause){
System.err.println("correlationData:"+ correlationData);
System.err.println("ack:"+ack);
if(!ack){
System.err.println("异常处理...");
}
}
};
// 消息发送后的返回
final ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(org.springframwork.amqp.core.Message message, int replyCode,String replyText, String exchange,String routingKey){
System.err.println("return exchange: "+ exchange+",routingKey:"+routingKey);
}
};
public void send(Object message,Map<String,Object> properties) throws Exception{
MessageHeaders mhs=new MessageHeaders(properteis);
Message msg=MessageBuilder.createMessage(message,mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData cd=new CorrelationData("123456789"); //这里的id值一定要唯一。我们为了测试随便写的。用于生产环境可使用:id+时间戳等方式
rabbitTemplate.converAndSend("exchange-1","springboot.hello",msg);
}
}
ack确认中的if(!ack){}里面,我们可以实现自己的方法。比如做事务的时候,当消息发送不成功,即ack=false时,我们可以将消息发送不成功的状态录入数据库。做定时任务对数据库中这些发送不成功的消息进行消息重试(重新发送),以保证消息最终都能成功发送,不会漏发;而returnCallback这里的方法,当消息返回失败的时候,它能告诉我们消息发送失败的原因等。 CorrelationData这里是为了确定此消息的唯一性。当我们确定消息发送失败等,我们可以对此指定的消息进行操作。所以里面的id要保证全局唯一。
测试类:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{
@Test
public void contextLoads(){}
@Autowired
private RabbitSender rabbitSender;
private static SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender1() throws Exception{
Map<String,Object> properties =new HashMap<>();
properties.put("number","12345");
properties.put("send_time",simpleDateFormat.format(new Date()));
rabbitSender.send("Hello RabbitMQ",properties);
}
}
这里的SimpleDateFormat是一个线程不安全的时间解析类,此处仅用于功能实现,生产环境使用请注意。
消费端核心配置:
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5;
其他要点:
@RabbitListener注解的使用
核心两个代码:
@RabbitListener(bindings=@QueueBinding(
value=@Queue(value="queue-1",durable="true"),
exchange=@Exchange(value="exchange-1",durable="true"),
type="topic",
ignoreDeclarationExceptions="true"),,key="springboot.*"))
@RabbitHandler
public void onMessage(Message message,Channel channel) throws Exception{]
由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
配置文件写法,代码如下:
完整代码请参考上面一小节的发送端代码。
使用实体类接收和发送消息代码演示:
这里是引入生产者的发送消息的方法,直接将此对象传输;
SpringCloud ,这个全家桶框架在整个中小型互联网公司异常的火爆,那么相对应着,Spring Cloud Stream就渐渐的被大家所重视起来,这一节课主要来介绍Spring Cloud Stream 如何与RabbitMQ进行集成。
Spring Cloud Stream 整体架构核心概念图
Spring Cloud Stream 整体架构核心概念图
Barista接口:
Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息
@Output:输出注解,用于定义发送消息接口
@Input: 输入注解,用于定义消息的消费者接口
@StreamListener:用于定义监听方法的注解
使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常适合,但是使用Spring Cloud Stream 框架有一个非常大的问题就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失问题。
存在消息丢失的问题我们可以通过补偿机制进行解决;
消费端XML引入的核心依赖:
实体类(类名可以改的,不一定都叫这名,这是官方的名字)
消费端配置application.properties:
requeue-rejected 是否支持return; acknowledge-model=MANUAL (手动签收) ; recovery-interval 服务不稳定多少毫秒后进行重连 durable-subscription 是否启动持久化订阅; max-concurrenty=5 最大监听数
消费端监听消息代码:
这里如果换成Kafka,大部分代码也是不需要改变的。并且我们的发送端可以使用Kafka,接收端可以使用RabbitMQ这种非同种消息类型的消息。
发送消息测试:
Spring Cloud Stream 这里就相当于多了一个中间层,它将底层是Kafka或者RabbitMQ或者是其他的作为配置不侵入代码。消息的接收和发送都通过管道进行。在以后如果需要替换或者同时使用多种消息类型,都是可以的。
远距离通信和复制,所谓Shovel就是我们可以把消息进行不同数据中心的复制工作,我们可以跨地域的让两个mq集群互联。我们下面看一下Shovel架构模型;
图示:
在使用了shovel插件后,模型变成了近端同步确认,远端异步确认的方式,大大提高了订单确认速度,并且还能保证可靠性。
细节图示:
正常队列压力过大的时,会将订单复制到远端中心,在远端进行数据消费进行异步确认;用的不是特别多的原因是因为我们目前已经有了更好的远端模式,这个是比较早期的使用方式;
Shovel集群的配置,首先启动rabbitMQ插件,命令如下:
rabbitmq-plugins enable amqp-client
rabbitmq-plugins enable rabbitmq_shovel
步骤:
具体的可以百度。
Federation插件进行互相复制;
集群构建可参考百度
HAProxy是一款提供高可用性、负载均衡以及基于TCP(第四层)和HTTP(第七层)应用的代理软件。支持虚拟主机,它是免费、快速并且可靠的一种解决方案。HAProxy特别适用于那些负载特大的web站点,这些站点通常又需要会话保持或七层处理。HAProxy运行在时下的硬件上,完全可以支持数以万计的并发连接。并且它的运行模式使得它可以很简单安全的整合进您当前的架构中,同时可以保护你的web服务器不被暴露到网络上。
Haproxy性能最大化(一)
实战安装:
yum install gcc vim wget
wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.5.tar.gz
cd /usr/local/software/
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
cd haproxy-1.6.5/
make TARGET=linux31 PREFIx=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
touch /etc/haproxy/haproxy.cfg
我们对代理等的配置,可以在这里面进行配置; 通过
cd software/
会看到一个haproxy.cfg ,我们可以通过命令:
mv haproxy.cfg /etc/haproxy/
将配置移动 然后cd /etc/haproxy/
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
ps -ef | grep haproxy
localhost:8100/rabbitmq-stas
简介:
三个重要功能:
KeepAlived高可用原理:
什么是VRRP?
具体的安装可以百度
本章小结:本章我们掌握了RabbitMQ各种集群构建姿势,真正从零开始构建一个高可用的RabbitMQ集群,通过镜像队列+Haproxy+KeepAlived的架构进行构建!并且我们介绍了集群的节点宕机故障问题如何进行解决的5个解决方案!也学习了延迟插件的使用!
容灾问题:
资源扩展问题:
大集群拆分问题:
业界主流方案: 单元化架构方案(阿里,支付宝,饿了么,微信等)
- 使用灾备的思想,在同城“双活”的基础上,在异地部署一套灾备数据中心,每个中心都具有完备的数据处理能力,只有当主节点故障需要容灾的时候才会紧急启动备用数据中心;
- 优缺点图示:
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191023214325480.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3MTI4MDQ5,size_16,color_FFFFFF,t_70)
SET化架构设计:
流量路由:
中心集群:
单元化集群:
中间件(RPC、KV、MQ等):
数据同步:
SET化路由策略及其能力:
高效的本地化服务:
SET化架构图:
SET化架构流转图:
SET化重要的原则:
SET消息中间件架构实现(RabbitMQ双活):
使用此集群插件,发送一个消息到集群中,它可以进行转发复制到另外一个集群中;具体部署方式请百度;
本章小结:本章主要讲了互联网大长的SET化架构进衍,以及使用SET化架构能解决哪些问题,SET化架构的核心设计目标和重要原则,通过对RabbitMQ的SET化设计,也就是使用federation插件构建多活集群,实现多中心的数据同步!我们可以对大规模集群的部署有一个更可靠的解决方案!
一线大厂的MQ组件实现思路和架构设计方案
MQ组件实现功能点(一)
MQ组件实现的功能点(二)
MQ组件实现功能点(三)
不落库存储,不做消息的其他处理,直接拿到就消费;损失了可靠性,提高了性能
概述:
图示:
备注:比如生产端消息没有完全投递成功、或者消费端落库异常,导致消费端落库后缺少消息条目的情况
建议还是拆分成小消息,这样不用考虑顺序问题,多线程处理,然后接收到后再进行手动排序;
可靠性投递,就需要数据库;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。