赞
踩
AB应用不在互相依赖
流量达到高峰的时候,通常使用限流算法来控制流量涌入系统,避免系统被击瘫,但是这种方式损失了一部分请求
此时可以使用消息中间件来缓冲大量的请求,匀速消费,当消息队列中堆积消息过多时,我们可以动态上线增加消费端,来保证不丢失重要请求。
消息中间件可以把各个模块中产生的管理员操作日志、用户行为、系统状态等数据文件作为消息收集到主题中
数据使用方可以订阅自己感兴趣的数据内容互不影响,进行消费
跨语言
broker
Broker面向producer和consumer接受和发送消息
向nameserver提交自己的信息
是消息中间件的消息存储、转发服务器。
每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
Master多机负载,可以部署多个broker
每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。
消息的生产者
通过集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。
注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点
nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时想nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,
NameServer集群间互不通信,没有主备的概念
nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
为什么不用zookeeper?:rocketmq希望为了提高性能,CAP定理,客户端负载均衡
opic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。
对应到JMS中的topic实现是由客户端来完成的
consumer.setMessageModel(MessageModel.BROADCASTING);
消息消费模式由消费者来决定,可以由消费者设置MessageModel来决定消息模式。
消息模式默认为集群消费模式
分为集群模式和广播模式两种
- //用法
- consumer.setMessageModel(MessageModel.BROADCASTING);
- consumer.setMessageModel(MessageModel.CLUSTERING);
-
- //源码
- public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
- private MessageModel messageModel = MessageModel.CLUSTERING;
-
- //......
-
- public void setMessageModel(MessageModel messageModel) {
- this.messageModel = messageModel;
- }
- }
-
- public enum MessageModel {
- /**
- * broadcast
- */
- BROADCASTING("BROADCASTING"),
- /**
- * clustering
- */
- CLUSTERING("CLUSTERING");
-
- private String modeCN;
-
- MessageModel(String modeCN) {
- this.modeCN = modeCN;
- }
-
- public String getModeCN() {
- return modeCN;
- }
- }

集群消息是指集群化部署消费者
当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
特点
每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
在消息重投时,不能保证路由到同一台机器上
消费状态由broker维护
当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
特点
消费进度由consumer维护
保证每个消费者消费一次消息
消费失败的消息不会重投
消息发送中进入同步等待状态,可以保证消息投递一定到达
想要快速发送消息,又不想丢失的时候可以使用异步消息
- //用法
- producer.send(message,new SendCallback() {
-
- public void onSuccess(SendResult sendResult) {
- // TODO Auto-generated method stub
- System.out.println("ok");
- }
-
- public void onException(Throwable e) {
- // TODO Auto-generated method stub
- e.printStackTrace();
- System.out.println("err");
- }
- });
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- //用法
- producer.sendOneway(message);
可以多条消息打包一起发送,减少网络传输次数提高效率。
producer.send(Collection c)
方法可以接受一个集合 实现批量发送
- //源码
- public SendResult send(
- Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(batch(msgs));
- }
批量消息要求必要具有同一topic、相同消息配置
不支持延时消息
建议一个批量消息最好不要超过1MB大小
如果不确定是否超过限制,可以手动计算大小分批发送
可以通过TAG和SQL两种方式进行消息过滤
- //源码
- public class MessageSelector {
-
- private String type;
-
- private String expression;
-
- private MessageSelector(String type, String expression) {
- this.type = type;
- this.expression = expression;
- }
-
- public static MessageSelector bySql(String sql) {
- return new MessageSelector(ExpressionType.SQL92, sql);
- }
-
- public static MessageSelector byTag(String tag) {
- return new MessageSelector(ExpressionType.TAG, tag);
- }
-
- public String getExpressionType() {
- return type;
- }
-
- public String getExpression() {
- return expression;
- }
- }
-
- public class ExpressionType {
-
- public static final String SQL92 = "SQL92";
-
- public static final String TAG = "TAG";
-
- public static boolean isTagType(String type) {
- if (type == null || "".equals(type) || TAG.equals(type)) {
- return true;
- }
- return false;
- }
- }

在Producer中使用Tag:
Message msg = new Message("TopicTest","TagA" ,("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET));
在Consumer中订阅Tag:
consumer.subscribe("TopicTest", "TagA||TagB");// * 代表订阅Topic下的所有消息
消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,您可以使用SQL表达式筛选出消息.
在broker.conf
中添加配置
enablePropertyFilter=true
启动broker 加载指定配置文件
../bin/mqbroker -n 192.168.150.113:9876 -c broker.conf
随后在集群配置中可以看到
实例
- MessageSelector selector = MessageSelector.bySql("order > 5");
- consumer.subscribe("xxoo3", selector);
RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.
>
, >=
, <
, <=
, BETWEEN
, =
;=
, <>
, IN
;IS NULL
或者 IS NOT NULL
;AND
, OR
, NOT
;常量类型是:
NULL
, 特殊常数;TRUE
或FALSE
;RocketMQ使用messageDelayLevel可以设置延迟投递
默认配置为
messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
在broker.conf
中添加配置
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
时间单位支持:s、m、h、d,分别表示秒、分、时、天;
发送消息时设置
- //用法
- message.setDelayTimeLevel(1);
队列先天支持FIFO模型,单一生产和消费者下只要保证使用MessageListenerOrderly
监听器即可
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。
并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。
那么只要顺序的发送,再保证一个线程只去消费一个队列上的消息,那么他就是有序的。
跟普通消息相比,顺序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的实现类,并重写select选择使用的队列,因为顺序消息局部顺序,需要将所有消息指定发送到同一队列中。
保证有序参与因素
1. 同一个Topic
2. 同一个queue
3. 发消息的时候一个线程去发送消息
4. 消费的时候 一个线程 消费一个queue里的消息
5. 多个queue 只能保证单个queue里的顺序
实现类有三个,哈希随机和一个未完全实现的,可以手动将消息放进Topic的某一个队列中
- //源码
- //消息队列选择器
- public interface MessageQueueSelector {
- MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
- }
-
- //三个实现类
-
- //哈希
- public class SelectMessageQueueByHash implements MessageQueueSelector {
-
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- int value = arg.hashCode();
- if (value < 0) {
- value = Math.abs(value);
- }
-
- value = value % mqs.size();
- return mqs.get(value);
- }
- }
-
- //随机
- public class SelectMessageQueueByRandom implements MessageQueueSelector {
- private Random random = new Random(System.currentTimeMillis());
-
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- int value = random.nextInt(mqs.size());
- return mqs.get(value);
- }
- }
-
- //未实现
- public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
- private Set<String> consumeridcs;
-
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- return null;
- }
-
- public Set<String> getConsumeridcs() {
- return consumeridcs;
- }
-
- public void setConsumeridcs(Set<String> consumeridcs) {
- this.consumeridcs = consumeridcs;
- }
- }

MessageListener对象用于接收异步传递的消息
- //源码
- public interface MessageListener {
- }
-
- //顺序消费
- //用于按顺序接收异步传递的消息。一个队列,一个线程
- public interface MessageListenerOrderly extends MessageListener {
-
- ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
- final ConsumeOrderlyContext context);
- }
-
- //并发消费
- //用于同时接收异步传递的消息
- public interface MessageListenerConcurrently extends MessageListener {
-
- ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
- final ConsumeConcurrentlyContext context);
- }

- //发送端
- public class Producer02 {
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("mq02");
- producer.setNamesrvAddr("192.168.1.9:9876");
- producer.start();
-
- for (int i = 0; i < 20; i++) {
- Message message = new Message("myTopic02", ("hi! " + i).getBytes());
- //实现的消息队列选择器有三种: SelectMessageQueueByRandom, SelectMessageQueueByHash, SelectMessageQueueByMachineRoom
- producer.send(message, new MessageQueueSelector() {
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- MessageQueue messageQueue = mqs.get((Integer) arg);
- return messageQueue;
- }
- }, 1, 5000);
- }
- }
- }
-
- //接收端
- public class Consumer02 {
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq02");
- consumer.setNamesrvAddr("192.168.1.9:9876");
- consumer.subscribe("myTopic02", "*");
-
- //设置最大和最小消费线程数为1
- consumer.setConsumeThreadMax(1);//默认大小为20
- consumer.setConsumeThreadMin(1);//默认大小为20
-
- //两种消费模式
- /*consumer.registerMessageListener(new MessageListenerConcurrently() {//并发消费
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- //默认状态有两种: RECONSUME_LATER 和 CONSUME_SUCCESS
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });*/
- consumer.registerMessageListener(new MessageListenerOrderly() {//顺序消费
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- for (MessageExt msg : msgs) {
- System.out.println("msg = " + new String(msg.getBody()));
- }
- //默认状态有四种 SUCCESS, ROLLBACK, COMMIT, SUSPEND_CURRENT_QUEUE_A_MOMENT;
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- consumer.start();
- }
- }

默认超时时间
- //Timeout for sending messages.
- private int sendMsgTimeout = 3000;
-
- // 异步发送时 重试次数,默认 2
- producer.setRetryTimesWhenSendAsyncFailed(1);
-
- // 同步发送时 重试次数,默认 2
- producer.setRetryTimesWhenSendFailed(1);
-
- // 是否向其他broker发送请求 默认false
- producer.setRetryAnotherBrokerWhenNotStoreOK(true);
消费超时,单位分钟
consumer.setConsumeTimeout()
发送ack,消费失败
RECONSUME_LATER
只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试
重投使用messageDelayLevel
默认值
messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性
RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时:如果超过回查次数,默认回滚消息
- public interface TransactionListener {
-
- //当发送事务性prepare(half)消息成功时,将调用此方法以执行本地事务
- LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
-
- //没有响应准备(half)消息时。broker将发送检查消息以检查事务状态,并将调用此方法以获取本地事务状态
- LocalTransactionState checkLocalTransaction(final MessageExt msg);
- }
半消息发送成功触发此方法来执行本地事务
broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
- public enum LocalTransactionState {
- COMMIT_MESSAGE,
- ROLLBACK_MESSAGE,
- UNKNOW,
- }
LocalTransactionState.COMMIT_MESSAGE
执行事务成功,确认提交
LocalTransactionState.ROLLBACK_MESSAGE
回滚消息,broker端会删除半消息
LocalTransactionState.UNKNOW
暂时为未知状态,等待broker回查
- public class TransactionProducer {
- public static void main(String[] args) throws MQClientException {
- TransactionMQProducer producer = new TransactionMQProducer("xoxogp");
- producer.setNamesrvAddr("192.168.0.104:9876");
- //回调
- producer.setTransactionListener(new TransactionListener() {
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- //执行本地事务
- System.out.println("******executeLocalTransaction******");
- System.out.println("msg = " + new String(msg.getBody()));
- System.out.println("msg = " + msg.getTransactionId());
- /**
- * 事务方法写这里,同步执行
- * a()
- * b()
- * c()
- * d()
- */
- try {
- // 业务
- } catch (Exception e) {
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- //真正发出去的数据 可用
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- //Broker 端回调,检察事务
- System.out.println("******checkLocalTransaction******");
- System.out.println("msg = " + new String(msg.getBody()));
- System.out.println("msg = " + msg.getTransactionId());
- return LocalTransactionState.COMMIT_MESSAGE;
- }
- });
- producer.start();
- TransactionSendResult sendResult = producer.sendMessageInTransaction(
- new Message("xxxooo001", "测试!这是事务消息".getBytes()), null);
- System.out.println("sendResult = " + sendResult);
- System.out.println("已经停机");
- }
- }

在消费端,我们可以视情况来控制消费过程
DefaultMQPushConsumer 由系统自动控制过程,
DefaultMQPullConsumer 大部分功能需要手动控制
在集群消费模式下(clustering)
相同的group中的每个消费者只消费topic中的一部分内容
group中的所有消费者都参与消费过程,每个消费者消费的内容不重复,从而达到负载均衡的效果。
使用DefaultMQPushConsumer,新启动的消费者自动参与负载均衡。
短轮询
client不断发送请求到server,每次都需要重新连接
client发送请求到server,server有数据返回,没有数据请求挂起不断开连接
Consumer -> Broker RocketMQ采用的长轮询建立连接
consumer的处理能力Broker不知道
直接推送消息 broker端压力较大
采用长连接有可能consumer不能及时处理推送过来的数据
pull主动权在consumer手里
交互过程:
1. Client->Server,有没有消息,有,返回,一次长轮询结束
2. Client->Server,有没有消息,没有,挂起,
Server有消息了,推给Client,一次长轮询结束
RocketMQ 使用文件系统持久化消息。性能要比使用DB产品要高。
文件写入速度 顺序读写:3G左右 随机读写2G
很多使用文件系统存储的高性能中间件都是用了零拷贝技术来发送文件数据,比如Nginx
所以为了使用零拷贝技术,RocketMQ的文件存储大小默认每个1G,超过1G会重新建立一个新文件
存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量。
默认配置 MessageStoreConfig
核心方法
默认大小 1G
- // CommitLog file size,default is 1G
- private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
通过消息偏移量建立的消息索引
针对每个Topic创建,消费逻辑队列,存储位置信息,用来快速定位CommitLog中的数据位置
启动后会被加载到内存中,加快查找消息速度
以Topic作为文件名称,每个Topic下又以queue id作为文件夹分组
默认大小
- // ConsumeQueue extend file size, 48M
- private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
消息的Key和时间戳索引
默认文件会存储在家目录下/root/store/
- storePathConsumeQueue=/root/store/consumequeue
- storePathIndex=/root/store/index
以json格式存储消费信息
在CommitLog初始化时,判断配置文件 加载相应的service
- if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- this.flushCommitLogService = new GroupCommitService();
- } else {
- this.flushCommitLogService = new FlushRealTimeService();
- }
- // Determines whether there is sufficient free space
- if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
- this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
- // 1 TOTALSIZE
- this.msgStoreItemMemory.putInt(maxBlank);
- // 2 MAGICCODE
- 不够放下一个消息的时候,用魔术字符代替
- this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
- // 3 The remaining space may be any value
- // Here the length of the specially set maxBlank
- final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
- byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
- return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
- queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
- }
消息被Broker写入磁盘后再给producer响应
消息被Broker写入内存后立即给producer响应,当内存中消息堆积到一定程度的时候写入磁盘持久化。
用来提供服务的注册发现的
NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能
Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费
NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息
NameServer实例时间互不通信,这本身也是其设计亮点之一,即允许不同NameServer之间数据不同步(像Zookeeper那样保证各节点数据强一致性会带来额外的性能消耗)
1. 没状态
这个里面存的信息只在内存中存储,少量的信息会在这个nameServer中做持久化,这些少量的信息单指在执行顺序消费的时候,会把顺序消费的信息给持久化到nameServer上
2.高可用
在组建nameServer集群,高可用集群的情况下,和ZooKeeper做对比的话,ZooKeeper多节点之间是有互通的,ZooKeeper是有状态的,所谓的有状态,它会存储一些状态信息,每台机器上的状态信息,可能一样也可能不一样,这些数据信息会同步,每一个节点之间存储的数据是一样的在ZooKeeper中,节点和节点之间会通讯
nameServer是不会通讯的,之间没有互联的感觉,这样就起到了高可用的作用,同时也起到了高性能的作用,因为在数据同步的时候,会执行更多的业务逻辑,这个注册中心的节点就会变得更简单了
3. 高性能
4. 数据会有不一致的情况
这个数据存的是broker在启动的时候,初始化自己,这个broker会向nameServer提交一些信息,nameServer会提供一个列表,这个列表就在nameServer中存着的,这个Producer要找这个Broker的话,通过这个nameServer去找,这是他的基本的逻辑,这个nameServer会包含broker的IP和Topic的信息
在线上环境下,这个broker不可能只有一个,最少是两个,提供主从复制,作为高可用;
如果数据量比较大的话,或者Topic的数量比较多的话,或者Topic里面的Queue的量比较多的话,它会分布在不同的Broker中,实现这种分片式的负载
多台nameServer,如何保证每一台nameServer中的数据都差不多一致呢?
每一个Broker都定时向所有的nameServer发送心跳,在每一个nameServer上维持每一个Broker的连接信息
1. 网卡,吞吐量有上限
2. 数据: 数据会持久化到磁盘,如果磁盘坏了
3. 高可用
只有一个 Master节点
优点:配置简单,方便部署
缺点:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。多 Master 多 Slave 模式,异步复制
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响
每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。
每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。