赞
踩
如今说的大数据其实起源于谷歌在2004年发表的三篇论文:分布式文件系统 GFS、大数据分布式计算框架 MapReduce 和 NoSQL 数据库系统 BigTable。同时,在焦聚单机性能的同时,Google 开始考虑部署一个大规模的服务器集群来满足海量的数据存储以及计算的需求。
Lucene 开源项目的创始人 Doug Cutting 再开发开源搜索引擎 Nutch之后的2006 年,Doug Cutting 将大数据相关的功能从 Nutch 中分离了出来,然后启动了一个独立的项目专门开发维护大数据技术,这就是后来赫赫有名的 Hadoop,主要包括 Hadoop 分布式文件系统 HDFS 和大数据计算引擎 MapReduce。
但是MapReduce 进行大数据编程太麻烦了,于是便产生了一种脚本语言Hive(由Facebook发布),Hive 支持使用 SQL 语法来进行大数据计算,比如说你可以写个 Select 语句进行数据查询,然后 Hive 会把 SQL 语句转化成 MapReduce 的计算程序。大数据生态体系逐渐形成,其中包括:专门将关系数据库中的数据导入导出到 Hadoop 平台的 Sqoop;针对大规模日志进行分布式收集、聚合和传输的 Flume;MapReduce 工作流调度引擎 Oozie 等。同时还有Yarn最主流的资源调度系统。
一般说来,像 MapReduce、Spark 这类计算框架处理的业务场景都被称作批处理计算,因为它们通常针对以“天”为单位产生的数据进行一次计算,然后得到需要的结果,这中间计算需要花费的时间大概是几十分钟甚至更长的时间。因为计算的数据是非在线得到的实时数据,而是历史数据,所以这类计算也被称为大数据离线计算。
大数据领域,还有另外一类应用场景,它们需要对实时产生的大量数据进行即时计算,比如对于遍布城市的监控摄像头进行人脸识别和嫌犯追踪。这类计算称为大数据流计算,相应地,有 Storm、Flink、Spark Streaming 等流计算框架来满足此类大数据应用的场景。 流式计算要处理的数据是实时在线产生的数据,所以这类计算也被称为大数据实时计算。
同时,NoSQL 系统处理的主要也是大规模海量数据的存储与访问,所以也被归为大数据技术。 NoSQL 曾经在 2011 年左右非常火爆,涌现出 HBase、Cassandra 等许多优秀的产品,其中 HBase 是从 Hadoop 中分离出来的、基于 HDFS 的 NoSQL 系统。
大数据处理的主要应用场景包括数据分析、数据挖掘与机器学习。数据分析主要使用 Hive、Spark SQL 等 SQL 引擎完成;数据挖掘与机器学习则有专门的机器学习框架 TensorFlow、Mahout 以及 MLlib 等,内置了主要的机器学习和数据挖掘算法。
此外,大数据要存入分布式文件系统(HDFS),要有序调度 MapReduce 和 Spark 作业执行,并能把执行结果写入到各个应用系统的数据库中,还需要有一个大数据平台整合所有这些大数据组件和企业应用系统。
大数据系统都是分布式系统。我们需要大数据系统,就是因为普通的单机已经无法满足我们期望的性能了。那么作为一个分布式的数据系统,它就需要满足三个特性,也就是可靠性、可扩展性和可维护性。
一个系统,如果只记录一份数据,那么当硬件故障的时候就会遇到丢数据的问题,所以我们需要对数据做复制。而数据复制之后,以哪一份数据为准,又给我们带来了主从架构、多主架构以及无主架构的选择。
- 主从架构就是我们常见的Master和Slave架构,其模式有一主多从架构,其主从指尖的传输延迟会比较小,但是会产生很多dump现场,这样会加重主库的IO负载。比较常见的是数据库的主从架构,使用主从来做读写分离。
- 多主架构,每次修改发送到任一主节点上,并由这个主节点将修改同步到其他所有节点上。其优势:多主架构由就近数据中心主节点处理写入操作,并异步更新到其他数据中心,数据中心之间的网络可以用企业专线来降低延迟,对外提供写服务的能力可以根据主节点数量进行扩展。但是如果写操作在不同数据中心进行,可能会导致写入冲突的发生。可参考
- 无主架构,在多主架构下,某主节点同时向三个数据节点发出写入请求,其中两个正常节点执行了写入操作,崩溃的一个没有响应。但如果读请求发送到崩溃的节点,那么旧数据可能存在一定的可能性被返回。所以无主架构在读请求时需要把请求同时发送到多个数据节点,根据数据版本号来判断数据新旧。可参考1 可参考2
在最常见的主从架构里,我们根据复制过程,可以有同步复制和异步复制之分。同步复制的节点可以作为高可用切换的 Backup Master,而异步复制的节点只适合作为只读的 Shadow Master。
同步复制视同为:针对 master 的数据操作,都需要同样写到另外准备的这几台服务器上。只有当数据在 master 上操作成功,对应的操作记录刷新到硬盘上,并且这几个 Backup Master 的数据也写入成功,并把操作记录刷新到硬盘上,整个操作才会被视为操作成功。
异步复制:
只读的“影子 Master”,这些影子 Master 和Backup Master不同,master 写入数据并不需要等到影子 Master 也写入完成才返回成功。而是影子 Master 不断同步 master 输入的写入,尽可能保持追上 master 的最新状态。
在“大数据”的场景下,单个节点存不下所有数据,于是就有了数据分区。常见的分区方式有两种,第一种是通过区间进行分片,典型的代表就是 Bigtable,第二种是通过哈希进行分区,在大型分布式系统中常用的是一致性 Hash,典型的代表是 Cassandra。
对于可维护性其实可以总结为容错与恢复。为了确保我们不会因为部分网络的中断导致作出错误的判断,有一个非常著名的CAP算法。需要在一致性、可用性和分区容错性之间做权衡和选择。任何的可维护性的选择的主从架构、复制策略、分片策略,以及容错和恢复方案,都是根据实际的应用场景下对于 CAP 进行的权衡和选择。
1、一致性
在分布式环境下,一致性是指数据在多个副本之间能否保持一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一直的状态。
对于一个将数据副本分布在不同分布式节点上的系统来说,如果对第一个节点的数据进行了更新操作并且更新成功后,却没有使得第二个节点上的数据得到相应的更新,于是在对第二个节点的数据进行读取操作时,获取的依然是老数据(或称为脏数据),这就是典型的分布式数据不一致的情况。在分布式系统中,如果能够做到针对一个数据项的更新操作执行成功后,所有的用户都可以读取到其最新的值,那么这样的系统就被认为具有强一致性
2、可用性
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。这里的重点是"有限时间内"和"返回结果"。
"有限时间内"是指,对于用户的一个操作请求,系统必须能够在指定的时间内返回对应的处理结果,如果超过了这个时间范围,那么系统就被认为是不可用的。另外,"有限的时间内"是指系统设计之初就设计好的运行指标,通常不同系统之间有很大的不同,无论如何,对于用户请求,系统必须存在一个合理的响应时间,否则用户便会对系统感到失望。
"返回结果"是可用性的另一个非常重要的指标,它要求系统在完成对用户请求的处理后,返回一个正常的响应结果。正常的响应结果通常能够明确地反映出队请求的处理结果,即成功或失败,而不是一个让用户感到困惑的返回结果。
3、分区容错性
分区容错性约束了一个分布式系统具有如下特性:分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。
网络分区是指在分布式系统中,不同的节点分布在不同的子网络(机房或异地网络)中,由于一些特殊的原因导致这些子网络出现网络不连通的状况,但各个子网络的内部网络是正常的,从而导致整个系统的网络环境被切分成了若干个孤立的区域。需要注意的是,组成一个分布式系统的每个节点的加入与退出都可以看作是一个特殊的网络分区。
即使是上万台的分布式集群,最终还是要落到每一台单个服务器上完成数据的读写。那么在存储引擎上,关键的技术点主要包括三个部分。
在数据写入系统时,我们需要保障写入的数据是原子的、完整的。在传统的数据库领域,我们有 ACID 这样的事务特性,也就是原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)以及持久性(Durability)。而在大数据领域,很多时候因为分布式的存在,我们常常会退化到一个叫做 BASE 的模型。BASE 代表着基本可用(Basically Available)、软状态(Soft State)以及最终一致性(Eventually Consistent)。
1、Basically Available-基本可用
分布式系统发生不可预知的故障时,允许损失部分可用性,如服务降级等。
2、Soft state-弱状态
分布式系统不同节点间某个时刻数据允许存在中间状态,不同节点的数据副本之间进行同步时可能存在时延,如主从同步。
3、Eventually consistent-最终一致
分布式系统不同节点的所有数据副本,在经过一段时间数据同步后,最终达到一致状态,即保证最终一致性,不保证实时一致性。
我们通常接触的常见中间件,如mysql、zookeeper、redis、elasticsearch等都是基于BASE理论建立的。
在单机上,一般会用都会使用预写日志(WAL)、快照(Snapshot)和检查点(Checkpoints)以及写时复制(Copy-on-Write)这些技术,来保障数据在单个节点的写入是原子的。可参考
要考虑到计算机硬件的特性,比如数据的顺序读写比随机读写快,在内存上读写比硬盘上快;也要考虑到我们在算法和数据结构中的时空复杂度,比如 Hash 表的时间复杂度是 O(1),B+ 树的时间复杂度是 O(logN)。分布式数据库最常使用的,其实是基于 LSM 树(Log-Structured Merge Tree)的 MemTable+SSTable 的解决方案。可参考
比较常见的有Thrift 二进制序列化,Parquet 或者 ORCFile 这样的列存储格式。
GFS的设计原则:
在这个设计原则下, GFS 是一个非常简单的单 Master 架构,但是这个 Master 其实有三种不同的身份,
分别是:
Master 的主要存储以下信息:文件和 chunk 的命名空间信息,也就是类似前面 /data/geektime/bigdata/gfs01 这样的路径和文件名;这些文件被拆分成了哪几个 chunk,也就是这个全路径文件名到多个 chunk handle 的映射关系;这些 chunk 实际被存储在了哪些 chunkserver 上,也就是 chunk handle 到 chunkserver 的映射关系。
Checkpoints 和操作日志、Backup Master、外部的监控程序,以及只读的影子 Master。在这个高可用场景下,Master 是唯一可以写入数据的节点。Backup Master 通过同步复制的方式从 master 同步数据,而影子 Master 则通过异步复制的方式来同步 master 的数据。
问题:为什么客户端不是一次将数据发给所有的chunkserver?
原因是网络的瓶颈问题:
我们要发送 1GB 的数据给 GFS,客户端的出口网络带宽有 100MB/ 秒,那么我们只需要 10 秒就能把数据发送完。但是因为三个 chunkserver 的数据都要从客户端发出,所以要 30s 才能把所有的数据都发送完,而且这个时候,三个 chunkserver 的网络带宽都没有用满,各自只用了 1/3,网络并没有被有效地利用起来。
问题:基于以下网络的架构你想怎么优化GFS的写入。
网络上的就近原则
客户端可以先把所有数据,传输给到网络里离自己最近的次副本 A,然后次副本 A 一边接收数据,一边把对应的数据传输给到离自己最近的另一个副本,也就是主副本。
原因:两台服务器如果在同一个机架上,它们之间的网络传输只需要通过接入层的交换机即可。在这种情况下,除了两台服务器本身的网络带宽之外,它们只会占用所在的接入层交换机的带宽。但是,如果两台服务器不在一个机架,乃至不在一个 VLAN 的情况下,数据传输就要通过汇聚层交换机,甚至是核心交换机了。而如果大量的数据传输,都是在多个不同的 VLAN 之间进行的,那么汇聚层交换机乃至核心交换机的带宽,就会成为瓶颈。
MapReduce 既是一个编程模型,又是一个计算框架。其编程模型只包含 Map 和 Reduce 两个过程,map 的主要输入是一对<Key, Value> 值,经过 map 计算后输出一对<Key, Value> 值;然后将相同 Key 合并,形成 <Key, Value>;再将这个<Key, Value>输入 reduce,经过计算输出零个或多个<Key, Value>对。
简而言之就是分治(Map:部分结果集),结果集汇总(Reduce)。但是需要注意的是,MapReduce计算模型延时很高,没法当成一个交互式系统来给使用。然后是多轮迭代问题。在 MapReduce 这个模型里,一个 MapReduce 就要读写一次硬盘,而且 Map 和 Reduce 之间的数据通信,也是先要落到硬盘上的。
中间那个过程,Map 函数的输出结果,会被整个 MapReduce 程序接手,进行一个叫做混洗的操作。混洗会把 Map 函数输出的所有相同的 Key 的 Value 整合到一个列表中,给到 Reduce 函数。并且给到 Reduce 函数的 Key,在每个 Reduce 里,都是按照 Key 排好序的。
MapReduce,它也是一个批量处理数据的框架,吞吐量(throughput)确实很大,但是延时(latency)和额外开销(overhead)也不小。
Bigtable 要解决什么问题?
拿最简单的分库分表举例,如果通过某个字段进行 %(模取)到定位的表,那么这个数据的划分可以认为是从设计之初就设计到了的。因为,如果我们需要对我们的数据集群进行扩容,那么需要搬运的数据则是十分庞大的。
Bigtable的目标,能够支撑百万级别随机读写 IOPS,并且伸缩到上千台服务器的一个数据库。但是光能撑起 IOPS 还不够。在这个数据量下,整个系统的“可伸缩性”和“可运维性”就变得非常重要。
第一个,是可以随时加减服务器,并且对添加减少服务器数量的限制要小,能够做到忙的时候加几台服务器,过几个小时峰值过去了,就可以把服务器降下来。第二个,是数据的分片会自动根据负载调整。某一个分片写入的数据多了,能够自动拆成多个分片来平衡负载。而如果负载大了,添加了服务器之后,也能很快平衡数据,让各个节点均匀承担压力。
小部分节点的故障,不应该影响整个集群的运行,我们的运维人员也不用急匆匆地立刻去恢复。集群自身也要有很强的容错能力,能够把对应的请求和服务,调度到其他节点去。
但同时Bigtable 也放弃了很多目标,其中有两个非常重要:第一个是放弃了关系模型,也不支持 SQL 语言;第二个,则是放弃了跨行事务,Bigtable 只支持单行的事务模型。
每一张 Bigtable 的表都特别简单,每一行就是一条数据:
其次便是十分重要的数据分区,BigTable采用自动去“分裂”(split)的方式来动态地进行分区。
整个数据表,会按照行键排好序,然后按照连续的行键一段段地分区。如果某一段行键的区间里,写的数据越来越多,占用的存储空间越来越大,那么整个系统会自动地将这个分区一分为二,变成两个分区。而如果某一个区间段的数据被删掉了很多,占用的空间越来越小了,那么我们就会自动把这个分区和它旁边的分区合并到一起。
在 Bigtable 里,是通过 Master 和 Chubby 这两个组件来完成这个任务的。这两个组件,加上每个分片提供服务的 Tablet Server,以及实际存储数据的 GFS,共同组成了整个 Bigtable 集群。
分区之后的每一片数据,在不同的分布式系统里有不同的名字,在 MySQL 里呢,我们一般叫做 Shard,Bigtable 里则叫做 Tablet。
Master、Chubby 和 Tablet Server 的用途Tablet Server 的角色最明确,就是用来实际提供数据读写服务的。一个 Tablet Server 上会分配到 10 到 1000 个 Tablets,Tablet Server 就去负责这些 Tablets 的读写请求,并且在单个 Tablet 太大的时候,对它们进行分裂。而哪些 Tablets 分配给哪个 Tablet Server,自然是由 Master 负责的,而且 Master 可以根据每个 Tablet Server 的负载进行动态的调度,也就是 Master 还能起到负载均衡(load balance)的作用。而这一点,也是 MySQL 集群很难做到的。这是因为,Bigtable 的 Tablet Server 只负责在线服务,不负责数据存储。实际的存储,是通过一种叫做 SSTable 的数据格式写入到 GFS 上的。也就是 Bigtable 里,数据存储和在线服务的职责是完全分离的。我们调度 Tablet 的时候,只是调度在线服务的负载,并不需要把数据也一并搬运走。
Master 的主要用途:
Chubby的主要用途:
为什么数据读写不需要 Master?
Chubby 帮我们保障了只有一个 Master,那么我们再来看看分区和 Tablets 的分配信息,这些信息也没有放在 Master。Bigtable 在这里用了一个很巧妙的方法,就是直接把这个信息,存成了 Bigtable 的一张 METADATA 表,而这张表在哪里呢,它是直接存放在 Bigtable 集群里面的,其实 METADATA 表自己就是一张 Bigtable 的数据表。
这其实有点像 MySQL 里面的 information_schema 表,也就是数据库定义了一张特殊的表,用来存放自己的元数据。不过,Bigtable 是一个分布式数据库,所以我们还要知道,这个元数据究竟存放在哪个 Tablet Server 里,这个就需要通过 Chubby 来告诉我们了。
客户端的具体查询流程如下:
为什么常用的 CSV 和 JSON 格式不可行?
csv格式:
csv的缺点是:
第一个是数据里面没有告诉我们数据类型是什么,我们只能根据字段的名称,以及查看少数几条数据来猜测。
第二个是很多数据用文本来保存有些浪费空间。
而JSON Schema对于每一条数据,我们不仅要存储数据,还要再存储一份字段名,占用的空间就更大了。
事实上,CSV 也好,JSON 也好,乃至 XML 也好,这些针对结构化数据进行编码主要想解决的问题是提升开发人员的效率,所以重视的是数据的“人类可读性”。因为在小数据量的情况下,程序员的开发效率是核心问题,多浪费一点存储空间算不了什么。但是在“大数据”的场景下,除了程序员的效率,存储数据本身的“效率”就变得非常重要了。
解决办法:
包含 IDL 并能向前和向后兼容的 Thrift 的 TBinaryProtocol(csv json二者结合)
通过 Schema 文件,定义出一个结构体,然后在里面列清楚字段的顺序、类型以及名称。写一个程序,能够解析这个 Schema 文件,然后自动生成可以根据结构体的 Schema 进行序列化和反序列化的代码。这个序列化和反序列化的代码是非常简单的,只要按照 Schema 里面出现的字段顺序,一个个对着字节数组去读或者写数据就好了。
但是如果数据结构会变,有什么方案可以既可以让老程序读出新格式的数据,新格式的程序又能读老格式的数据呢?
TBinaryProtocol 的实现方式很简单,那就是顺序写入数据的过程中,不仅会写入数据的值(field-value),还会写入数据的编号(field-id)和类型(field-type);读取的时候也一样。并且,在每一条记录的结束都会写下一个标志位。
struct SearchClick
{
1:string user_id,
2:string search_term,
3:i16 rank,
4:string landing_url,
// 5:i32 click_timestamp, deprecated 已废弃
6:i64 click_long_timestamp,
7:string ip_address
}
这样,在读取数据的时候,老版本的 v1 代码,看到自己没有见过的编号就可以跳过。新版本的 v2 代码,对于老数据里没有的字段,也就是读不到值而已,并不会出现不兼容的情况。
Delta Encoding,ZigZag 编码 +VQL 可变长数值表示,其实就是对位的进一步操作,可以对上面方案进行进一步优化。
跨语言 + 序列化 +RPC,使得 Thrift 解决了一个在“大数据领域”中很重要的问题,就是习惯于使用不同编程语言团队之间的协作问题。
先引入两个问题: Backup Master 和 Master 该怎么做到完全同步?在网络故障的情况下,集群里有两个 Master 的情况?
Chubby是一个粗粒度的分布式锁方案。
GFS 的 Master 是有一个同步复制的 Backup Master 的。所有在 Master 上的操作,都要同步在 Backup Master 上写入成功之后,才算真正写入完成。这句话说起来很容易,可是实际上并不容易做到。两阶段提交的过程其实非常直观,就是把数据的写入,拆分成了提交请求和提交执行这两个不同的阶段,然后通过一个协调者(Coordinator)来协调我们的 Master 和 Backup Master。
两阶段提交如下:
但是在这个两阶段提交的过程中,如果出现了硬件和网络故障,会发生什么事情呢?
如果是参与者发生了硬件故障,或者参与者和协调者之间的网络出现了故障。这个时候的硬件或者网络故障,就意味着参与者没有办法知道协调者到底想要继续推进事务,还是想要回滚。在这种情况下,参与者在硬件故障解决之后,会一直等待协调者给出下一步指令。
如果协调者之前已经收到了参与者的答应执行事务的响应,那么协调者会一直尝试重新联系参与者。
这样也就意味着,当硬件出现故障的时候,可能有一个参与者,已经在自己的节点上完成了事务的执行。但是另外一个参与者,可能要过很长一段时间,在硬件和网络恢复之后,才会完成事务。如果这两个参与者是 Master 和 Backup Master,那么在这段时间里,Master 和 Backup Master 之间的数据就是不一致的。
不过,如果外部所有和参与者的沟通,都需要通过协调者的话,协调者完全可以在 Backup Master 还没有恢复的时候,都告知外部的客户端等一等,之前的数据操作还没有完成。
在两阶段提交的逻辑里,是通过一个位居中间的协调者来对外暴露接口,并对内确认所有的参与者之间的消息是同步的。不过,两阶段提交的问题也很明显,那就是两阶段提交虽然保障了一致性(C),但是牺牲了可用性(A)。无论是协调者,还是任何一个参与者出现硬件故障,整个服务器其实就阻塞住了,需要等待对应的节点恢复过来。
将提交请求阶段再拆成两步,将已经提交转换成是否可以提交,已提交。
第一步,我们不用让各个参与者把执行的动作都准备好,也就是不用去写什么 undo logs 或者 redo logs,而是先判断一下这个事务是不是可以执行,然后再告诉协调者。这一步的请求叫做 CanCommit 请求。
第二步,当协调者发现大家都说可以执行的时候,再发送一个预提交请求,在这个请求的过程里,就和两阶段提交的过程中一样。所有的参与者,都会在这个时候去写 redo logs 和 undo logs。这一步的请求呢,叫做 PreCommit 请求。
在 CanCommit 请求和 PreCommit 请求阶段,所有参与者都可以告诉协调者放弃事务,整个事务就会回滚。如果出现网络超时之类的问题,整个事务也会回滚。不过,把整个提交请求的阶段拆分成 CanCommit 和 PreCommit 两个动作,缩短了各个参与者发生同步阻塞的时间。
原先无论任何一个参与者决定不能执行事务,所有的参与者都会白白先把整个事务的 redo logs 和 undo logs 等操作做完,并且在请求执行阶段还要再做一次回滚。而在新的三阶段提交场景下,大部分不能执行的事务,都可以在 CanCommit 阶段就放弃掉。
这意味着所有的参与者都不需要白白做无用功了,也不需要浪费很多开销去写 redo logs 和 undo logs 等等。另外,在最后的提交执行阶段,三阶段提交为了提升系统的可用性也做了一点小小的改造。
在进入最后的提交执行阶段的时候,如果参与者等待协调者超时了,那么参与者不会一直在那里死等,而是会把已经答应的事务执行完成。这个方式,可以提升整个系统的可用性,在出现一些网络延时、阻塞的情况下,整个事务仍然会推进执行,并最终完成。这个是因为,进入到提交执行阶段的时候,至少所有的参与者已经都在 PreCommit 阶段答应执行事务了。
但是,在一种特殊的情况下,三阶段提交带来的问题会比二阶段更糟糕。这种情况是这样的:
三阶段提交,就是为了可用性(A),牺牲了一致性(C)。
三阶段提交,其实就是在出现网络分区的情况下,仍然尝试执行事务。同时,又为了减少网络分区下,出现数据不一致的情况,选择拆分了提交请求。把提交请求变成了一个小开销的 CanCommit,和一个大开销的 PreCommit。
而为了保障线性一致性,或者说系统的可线性化,我们必须让主从节点之间是同步复制的。而要做到高可用的同步复制,我们就需要 Paxos 这样的共识算法。
Paxos需要解决的问题是在两阶段提交提交的基础上增加多个协调者,而协调者之间的问题:操作顺序的错乱。
多个协调者之间没有办法相互协调,达成一个两个操作在顺序上的共识。
在 Paxos 算法里,我们把每一个要写入的操作,称之为提案(Proposal)。接受外部请求,要尝试写入数据的服务器节点,称之为提案者(Proposer),比如说,我们可以让一组服务器里面有 5 个提案者,可以接受外部的客户端请求。
在 Paxos 算法里,并不是提案者一旦接受到客户端的请求,就决定了接下来的操作和结果的,而是有一个异步协调的过程,在这个协调过程中,只有获得多数通过(accept)的请求才会被选择(chosen)。这也是为什么,我们通常会选择 3 个或者 5 个节点这样的奇数数字,因为如果是偶数的话,遇到 2:2 打平这样的事情,我们就没法做出判断了。
这个投票机制也是 Quorum 这个名字的由来,因为 Quorum 在英文里的意思就是法定人数。
一旦达到了过半数,那么对应的请求就被通过了。既然我们的提案者已经准备好 5 个节点了,我们不妨就复用这 5 个节点,让这 5 个节点也作为 Quorum,来承担一个叫做接受者(Acceptor)的角色。
首先是每一个请求,我们都称之为一个“提案”。然后每个提案都有一个编号,这个编号由两部分组成。高位是整个提案过程中的轮数(Round),低位是我们刚才的服务器编号。
每个服务器呢,都会记录到自己至今为止看到过,或者用到过的最大的轮数。那么,当某一台服务器,想要发起一个新提案的时候,就要用它拿到的最大轮数加上 1,作为新提案的轮数,并且把自己的服务器编号拼接上去,作为提案号发放出去。并且这个提案号必须要存储在磁盘上,避免节点在挂掉之后,不知道最新的提案号是多少。
通过这个方式,我们就让这个提案号做到了两点:
那么,当提案者收到一条来自客户端的请求之后,它就会以提案者的身份发起提案。提案包括了前面的提案号,我们把这个提案号就叫做 M。这个提案会广播给所有的接受者,这个广播请求被称为 Prepare 请求。
而所有的 Acceptor 在收到提案的时候,会返回一个响应给提案者。这个响应包含的信息是这样的:
这样一个来回,就称之为 Paxos 算法里的 Prepare 阶段。要注意,这里的接受者只是返回告知提案者信息,它还没有真正接受请求。这个过程,本质上是提案者去查询所有的接受者,是否已经接受了别的提案。
当提案者收到超过半数的响应之后呢,整个提案就进入第二个阶段,也称之为 Accept 阶段。提案者会再次发起一个广播请求,里面包含这样的信息:
第一种情况,是之前接受者已经接受过值了。那么这里的值,是所有接受者返回过来,接受的值当中,提案号最大的那个提案的值。也就是说,提案者说,既然之前已经做出决策了,那么我们就遵循刚才的决策就好了。
而第二种情况,如果所有的提案者返回的都是 NULL,那么这个请求里,提案者就放上自己的值,然后告诉大家,请大家接受我这个值。
提案者还是会等待至少一半的接受者返回的响应。如果其中有人拒绝,那么提案者就需要放弃这一轮的提案,重新再来:生成新的提案号、发起 Prepare 请求、发起 Accept 请求。而当超过一半人表示接受请求的时候,提案者就认为提案通过了。当然,这个时候我们的提案虽然没有变,但是提案号已经变了。而当没有人拒绝,并且超过一半人表示接受请求的时候,提案者就认为提案通过了。
在 Paxos 算法这个过程中,其实一直在确保一件事情,就是所有节点,需要对当前接受了哪一个提案达成多数共识。
如果有多个 Proposer 同时想要向这个一致性模块写入一条日志,那么最终只会有一条会被成功写入,其余的提案都会被放弃。多个并发在多个 Proposer 上发生的写入请求,互相之间需要去竞争一次成功提案的机会。
缺点: 开销太大了。无论是否系统里面出现并发的情况,任何一个共识的达成,都需要两轮 RPC 调用。而且,所有的数据写入,都需要在所有的接受者节点上都写入一遍。
虽然 Paxos 算法帮助我们解决了单点故障,并且在没有单点的情况下,实现了共识算法,确保所有节点的日志顺序是相同的。但是,原始的 Paxos 算法的性能并不好。只是简单地写入一条日志,我们就可能要解决多个 Proposer 之间的竞争问题,有可能需要有好几轮的网络上的 RPC 调用。
如果我们往一个数据库同步写入日志都要通过 Paxos 算法,那么无论我们怎么优化,性能都是跟不上的。根本原因在于,在 Paxos 算法里,一个节点就需要承接所有的数据请求。虽然在可用性上,我们没有单点的瓶颈了,但是在性能上,我们的瓶颈仍然是单个节点。
分布式事务的一致性(Consistency),要从数据库事务ACID的特性之一,一致性去理解,指的是不同数据不会产生矛盾。分布式共识(Consensus),分布式系统的各个部分就按何种顺序处理数据达成一致意见的过程。也就是说一致性是结果,共识是达成一致的过程。可串行化(Serializable)是数据库事务中的概念,指的是事务被顺序处理,可线性化(Linearizability)是分布式中的概念,读操作可以读到最新写入的数据。以两次读取操作为例,可以看到Serializable和Linearizability的区别,在Serializable的要求下,两次读取操作需要顺序执行,在Linearizability的要求下,则可以并发执行。
“共识”并不需要在每一个操作、每一条日志写入的时候发生,我们只需要有一个“共识”,确认哪一个是 Master 就好了。
在 Chubby 这个系统里,它其实针对 Paxos 做了封装,把对外提供的接口变成一个锁。这样,Chubby 就变成了一个通用的分布式锁服务,而不是一个 Paxos 的一致性模块。在锁服务下达成的共识,就不是谁是 Master 了,而是哪一台服务器持有了 Master 的锁。对于应用系统来说,谁持有 Master 的锁,我们就认为这台服务器就是 Master。
Chubby 这个锁服务,是一个粗粒度的锁服务。所谓粗粒度,指的是外部客户端占用锁的时间是比较长的。比如说,我们的 Master 只要不出现故障,就可以一直占用这把锁。但是,我们并不会用这个锁做很多细粒度的动作,不会通过这个分布式的锁,在 Bigtable 上去实现一个多行数据写入的数据库事务。
Chubby 并不是提供一个底层的 Paxos 算法库,然后让所有的 GFS、Bigtable 等等,基于 Paxos 协议来实现数据库事务。而是把自己变成了一个分布式锁服务,主要解决 GFS、Bigtable 这些系统的元数据的一致性问题,以及容错场景下的灾难恢复问题。
在 Chubby 里,它自己的多个节点,会先通过“共识”算法,确认一个 Master 节点。这个 Master 节点,会作为系统中唯一的一个提案者(Proposer),所有对于 Chubby 的写入数据的请求,比如获取某个锁,都会发送到这个 Master 节点,由它作为提案者发起提案,然后所有节点都会作为接受者来接受提案达成共识。
只有一个提案者带来的好处就是,大部分时间,我们不太会因为两个 Proposer 之间竞争提案,而导致需要很多轮协商才能达成一致的情况。
对于 Chubby 的整个服务器端来说,我们可以把它看成一个三层的系统。最底层,是一个 Paxos 协议实现的同步日志复制的系统,也就是我们上一讲所说的状态机复制的系统。上面一层,就是通过这个状态机实现的数据库了,Google 是直接采用了 BerkeleyDB 作为这个数据库。换句话说,Chubby 是通过 Paxos 在多个 BerkeleyDB 里,实现数据库的同步复制。在 BerkeleyDB 之上,才是由 Chubby 自己实现的锁服务。
对于数据写入的请求,Master 会作为刚才我们说过的提案者,在所有的 Chubby 服务器节点上通过 Paxos 算法进行同步复制。而对于读请求,Master 直接返回本地数据就好,因为所有服务器节点上的数据是有共识的。
Chubby 对外封装的访问接口,是一个类似于 Unix 文件系统的接口。使用这个形式,同样也降低了使用 Chubby 的用户的门槛。毕竟每个工程师都熟悉用 ls 命令,去查询目录下的子目录和文件列表。Chubby 里的每一个目录或者文件,都被称之为一个节点(node)。外部应用所使用的分布式“锁”,其实就是锁在这个节点上。哪个客户端获得了锁,就可以向对应的目录或者文件里面写入数据。比如谁是真正的 Master,就是看谁获得了某个特定的文件锁。(有点像zookeeper)。
举个例子,我们可以定义 /gfs/master 这个命名空间,就用来存放 Master 的相关信息。这样,Master 服务器会通过 RPC 锁住这个文件,然后往里面写下自己的 IP 地址以及其他相关的元数据就好了。而其他客户端在这个时候,就无法获得这个锁,自然也就无法把 Master 改成自己。所有想要知道谁是 Master 的客户端,就只需要去查询 /gfs/master 这个文件就行。
作为分布式锁,客户端去获取的锁都是有时效的,也就是它只能占用这个锁一段时间。这个和我们前面提到的 Chubby 的 Master 的“租约”原理类似,主要是为了避免某个客户端获取了锁之后,它因为网络或者硬件原因下线了。
这样乍一听起来,我们只要给锁的时间设置一个时效就好了。不过,一旦涉及到不可靠的网络,事情就没有那么简单了。
解决办法:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。