赞
踩
总括:
因为bbolt数据库的内容中:key=(revision,version),value=(elemkey,elemvalue),treeIndex中key=elemkey,value=[]generation,而generation包含一个key的所有版本,也就是说treeIndex中的value就是bbolt中的key。因为treeIndex中包含所有的elemkey,所以压缩的思路就是先遍历整个treeIndex来进行压缩:没有被删除的elemkey就用最后一次写入的值来覆盖,然后把该elemkey对应的(revision,version)作为key保存到一个map keep中,被删除的key则不会保存到该map keep中,然后遍历数据库中所有key,如果该key即(revision,version)不在map keep中,那么就从数据库中删除该key,如果在,则跳过,然后就完成了整个压缩操作。总结就是compact一共两步,第一步压缩treeIndex,第二步压缩数据库。
一点杂记:
1:压缩treeIndex和压缩数据库是两个分开的操作。
2:除了compact(defrag操作不知道)操作,所有其他的修改操作比如put/delete都是一个只增不减的过程。
3:put/delete操作也分两步,这两步也是分开的。第一步是往treeIndex中增加数据。第二步是把事务提交到txnbuffer。
4:compact对数据库的修改和put/delete一样,都是把操作丢到batchTxnBuffere就认为成功了
5:读写可以并发,即一边读数据库,一边把修改操作丢到batchTxnBuffere,但是在提交batchTxnBuffere之前会等待所有的读完成之后才会继续提交
debug流程:
1:启动参数添加--auto-compaction-retention=1
2:修改代码,把等待间隔缩小,否则有些流程可能不会走到或者要隔很久才会走到,因为他内部单位是小时
流程:
他是单独开了一个线程每隔一段时间就compact一下
etcdserver.NewServer #创建一个etcdserver if num := cfg.AutoCompactionRetention; num != 0: #需要配置auto-compaction-retention才会开启compact,默认是不开启 v3compactor.New #创建一个compact对象 v3compactor.Periodic.Run #启动compact线程,内部一个死循环,每隔一段时间就执行一次compaction for{ #定时循环 sleep mvcc.readView.Rev #获取compact开始时etcd系统当前最新的版本号即currentRev,保存为变量compactRev #currentRev+1就是下一个待提交的版本号 #不知道为什么他还要专门创建一个readTx,这么麻烦,为什么不直接读这个字段就行了? etcdserver.EtcdServer.Compact etcdserver.EtcdServer.processInternalRaftRequestOnce #因为compact操作会改变集群状态,所以要走一遍raft流程, #记录一条compact日志到wal日志,然后etcdserver apply这条日志 #因为改变数据库的操作有很多,比如delete、put、compact, #所以etcdserver apply的时候会根据data字段中请求的类型来进行一个dispatch操作 # compact的raft流程同put,因为走raft流程时,raft是不关心日志内容的, # 所以put、delete、compact等都用的同一套流程,同一套代码, # 只有在apply的时候需要根据日志类型来进行一个dispatch操作,所以这里就直接跳到apply了 ......raft 流程,与put一模一样, 略...... , return } another thread 1{ #就是etcdserver进行apply的流程,只不过dispatch时是compact etcdserver.EtcdServer.run for select: case ap <- applyc ......略去了一大串apply调用,直接来到dispatch...... apply.uberApplier.dispatch #根据req类型来调用对应的方法 case r.Compaction: #如果这条日志是一个compact请求 apply.applyV3.Compaction apply.applierV3backend.Compaction mvcc.store.Compact s.mu.Lock() #源码注释中说对于事务加读锁,对于非事务如compact/defrag则加写锁 #还没完全搞懂 mvcc.store.checkPrevCompactionCompleted #检查上一次compact操作是否完成,etcd compact的时候会往数据库里写一些数据, #所以compact之前读一下这些数据,即读一下scheduledCompactRev和finishedCompactRev, #如果读到的这两个值相同说明上一次compact操作已经完成,可再次compact #scheduledCompactRev表示已经调度的最新的compact任务对应的revision #finishedCompactRev表示已完成的最新的compact任务对应的revision #因为他是fifo调度,因为先触发的copmact任务的revision必定小于后出发的revision #所以只需要判断这两个值是否相等就可以判断之前的compact任务是否完成了 mvcc.store.updateCompactRev #往bbolt数据库写入本次compact对应的版本号,即写入scheduedCompactRev=本次compactRev s.mu.Unlock #释放数据库写锁 mvcc.store.compact #执行压缩 schedule.NewJob #compact是作为job异步执行的,所以有可能上次还没结束就又来了一个新的compact请求 mvcc.store.scheduleCompaction #1:先从treeIndex中删除,然后再遍历数据库中所有key, #删除所有不在压缩后的treeIndex中的(revision,version) #删除范围为revision属于1~compactRev的版本 mvcc.treeIndex.Compact #treeIndex包含所有elemKey,所以先遍历treeIndex的所有elemkey, #针对每个elemKey,删除他所有在compactRev之前的版本 #来得到最终剩下的所有(revision,version) #如果key所有版本都没了,那就从treeIndex删除这个key mvcc.keyIndex.compact #合并elemkey的generation, #如果key没被删除则保留最后一个(revision,version),如果被删除则删除 btree.BTreeG[T].Delete #如果该key被删除了,就从treeIndex中删除这个key for{ #压缩完treeindex就开始压缩数据库,一次删除最多删除n个数据 #!!!一次事务删除多少条数据也是可以配置的 #他就是一个for循环,一批一批的删,当删完1批,释放锁,开始删下一批之前 #这个时候是可以commit put/delete事务和rage的,个人猜测是避免一次性删除太多 #导致put/delete长时间阻塞(个人猜测,不确定) backend.batchTx.LockOutsideApply #compact数据库之前禁止继续提交事务(主要是put/delete) #compact操作和其他put/delete一样,都会竞争这个batchTxnBuffer #此时treeIndex是没有锁的,所以put/delete/range是可以正常读取和修改treeIndex backend.batchTx.lock #apply的时候会修改batchTxBuffer,会先申请batchTx锁 #compact的时候会锁住buffer,从而禁止compact期间提交apply事务 backend.batchTx.UnsafeRange #读出所有key即(revision,version),因为此时已经获取了事务锁, #不用担心其他人修改,所以unsafe读 if key not in keep #如果该key不在合并后的treeIndex中,那么就从数据库中删除该key batchTxBuffered.UnsafeDelete if cur<batchNum : #如果本次处理的小于batch大小,说明所有的都删除了,可以结束本次compact mvcc.UnsafeSetFinishedCompact #往数据库写一条数据即finishedCompactRev=compactRev,标记本次compact已经完成 backend.batchTx.Unlock #解锁 return #结束本次compact。 #!!此处没有forceCommite,也就是最后这一批对应的事务没有强制提交就返回了 #!!因为和put/delete一样,compact把数据删除操作丢到txnbuffer就不管了 #这个txnbuffer同一时刻只能有一个事务修改 #由另一个线程定时commited。总的来说:txnbuffer就是一个缓存,任何事务修改操作, #只要把操作丢到这个txnbuffer后,该事务就可以认为该操作成功了,就可以直接返回了 #备注:batchTxnBuffered和batchTx以及bbolt事务之间的关系我还云里雾里,还不懂 #只知道batchTxnBuffered里有一个batchTx,然后所有操作都是转发给batchTx #笔记:写事务在开始之前会调用s.mu.RLock加读锁,然后把操作丢到txnbuffere的时候 #对txnbuffer加锁,然后返回,然后释放s.mu.RUnlock, #这样从加读锁到释放读锁就标记着一个事务的完成,如果加了读锁,但是还没有释放读锁 #说明数据库此时还有事务没有完成,在开始一次新的compact之前会对数据库加写锁 #直到所有已有的事务完成才能成功加写锁,所以他就是用这个s.mu来判断是否有事务还没有完成 #这个s.mu只有在triggersnapshot/compact中才会加写锁,其他地方都是加读锁(defrag不知道) else: #如果达到了CompactionBatchLimit,则立即提交本次事务, #因为配置文件中配置了允许单次事务compact的最大条数 backend.backend.ForceCommit backend.batchTxBuffered.Commit backend.batchTx.lock backend.batchTxBuffered.commit backend.readTx.Lock #因为我们前面创建读事务的时候会给readTx加读锁,所有这里获取写锁会阻塞 #直到所有读事务都完成,释放读锁后,这里才可以成功加写锁 #前面已经调用LockOutsideApply禁止了其他写事务提交,所以这里不用担心并发写 #读事务会先访问treeIndex然后才去数据库读,treeIndex内部会加锁,所以是并发安全的 #compact是先压缩treeIndex,然后再去压缩数据库,如果压缩完treeIndex之后 #在compact提交写事务前,来了读事务,那么读事务会先获取treeIndex,然后再去读数据库 #compact会因获取不到readTx的写锁而阻塞在这里, #所以在compact提交前和提交后都可以安全的读,但是compact中不可以 backend.batchTx.unlock } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。