赞
踩
dfs.namenode.support.allow.format
是否允许格式化,一般生产环境建议配置,防止误操作格式化了已有数据。NNStorage.newNamespaceInfo()
生成了随机的NamespaceInfo信息,里面包括namespaceID,clusterID,cTime,storageType,layoutVersion,blockPoolID等信息saveFSImageInAllDirs
,也就是通过FSImageSaver去保存上述创建的FSNamesystem。生成FsImage以及md5的镜像文件。此时txid 为 0 。hdfs namenode [-format [-clusterid cid ] [-force] [-nonInteractive] ]
# 常用的命令,不指定clusterId
hdfs namenode -format
# 手工指定 clusterId
hdfs namenode -format -clusterId aaaaa
入口org.apache.hadoop.hdfs.server.namenode.NameNode.format
方法
此步骤做了如下操作:
/**
* 验证配置的目录是否存在
* 交互式的确认是否格式化每一个目录
*/
private static boolean format(Configuration conf, boolean force,
boolean isInteractive) throws IOException {
// 获取 配置的 nameserviceId
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
// 根据当前机器 IP 获取 nn1
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
// 初始化通用的配置项
initializeGenericKeys(conf, nsId, namenodeId);
// 校验是否允许格式化操作,生产建议关闭了
checkAllowFormat(conf);
// 安全配置
if (UserGroupInformation.isSecurityEnabled()) {
InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
}
// 获取Namenode的目录,存放FsImage的地方。
Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
// 获取sharedEditsDirs的配置,共享journalnode的地址。
List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
List<URI> dirsToPrompt = new ArrayList<URI>();
dirsToPrompt.addAll(nameDirsToFormat);
dirsToPrompt.addAll(sharedDirs);
// 获取edits目录,一般不配置的话,和fsImage放在一起。
List<URI> editDirsToFormat =
FSNamesystem.getNamespaceEditsDirs(conf);
// if clusterID is not provided - see if you can find the current one
// 判断启动项中是否有指定 clusterID,没有的话,就生成一个随机的。
String clusterId = StartupOption.FORMAT.getClusterId();
if(clusterId == null || clusterId.equals("")) {
//Generate a new cluster id
// CID-uuid()
clusterId = NNStorage.newClusterID();
}
LOG.info("Formatting using clusterid: {}", clusterId);
// 创建 fsImage对象
FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
FSNamesystem fsn = null;
try {
// 创建 FSNamesystem 对象,存放Namenode元数据的对象。
fsn = new FSNamesystem(conf, fsImage);
// 默认的editLog是UNINITIALIZED状态,准备写的状态是BETWEEN_LOG_SEGMENTS
// 此步骤是,改变其状态为BETWEEN_LOG_SEGMENTS,初始化journalSet,里面存放了JournalManager对象。
// JournalManager分为两类FileJournalManager,QuorumJournalManager
// FileJournalManager是写edits到本地
// QuorumJournalManager是写edits到远端共享服务中。
fsImage.getEditLog().initJournalsForWrite();
// Abort NameNode format if reformat is disabled and if
// meta-dir already exists
// 判断是否允许重新格式化操作。默认是 false
if (conf.getBoolean(DFSConfigKeys.DFS_REFORMAT_DISABLED,
DFSConfigKeys.DFS_REFORMAT_DISABLED_DEFAULT)) {
force = false;
isInteractive = false;
for (StorageDirectory sd : fsImage.storage.dirIterable(null)) {
if (sd.hasSomeData()) {
throw new NameNodeFormatException(
"NameNode format aborted as reformat is disabled for "
+ "this cluster.");
}
}
}
// 交互式的提示是否要格式化。
if (!fsImage.confirmFormat(force, isInteractive)) {
return true; // aborted
}
// 开始格式化了
fsImage.format(fsn, clusterId, force);
} catch (IOException ioe) {
LOG.warn("Encountered exception during format", ioe);
throw ioe;
} finally {
if (fsImage != null) {
fsImage.close();
}
if (fsn != null) {
fsn.close();
}
}
return false;
}
入口org.apache.hadoop.hdfs.server.namenode.FSImage.format
方法
此步骤做了如下操作:
void format(FSNamesystem fsn, String clusterId, boolean force)
throws IOException {
long fileCount = fsn.getFilesTotal();
// Expect 1 file, which is the root inode
Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
// 生成集群信息,最终写入到VERSION文件中
NamespaceInfo ns = NNStorage.newNamespaceInfo();
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
ns.clusterID = clusterId;
// 往配置的namenode的每个目录创建current,写 VERSION / seen_txid
storage.format(ns);
// 调用journalnode的RPC进行format操作
editLog.formatNonFileJournals(ns, force);
// 保存fsImage到所有配置的namenode目录中
saveFSImageInAllDirs(fsn, 0);
}
默认VERSION文件内容如下:
#Mon Mar 20 07:50:13 CST 2023
namespaceID=1262756384
clusterID=CID-b6aa4a27-242a-49b0-98ae-23f4122f3f6d
cTime=1679269550070
storageType=NAME_NODE
blockpoolID=BP-389782493-10.253.128.31-1679269550070
layoutVersion=-66
默认seen_txid文件内容如下:
0
入口org.apache.hadoop.hdfs.server.namenode.NNStorage.format
方法
private void format(StorageDirectory sd) throws IOException {
// 创建 current,如果存在就清空里面的内容,并且设置posix权限。
sd.clearDirectory(); // create currrent dir
// 写VERSION文件
writeProperties(sd);
// 写 seen_txid 文件
writeTransactionIdFile(sd, 0);
LOG.info("Storage directory {} has been successfully formatted.",
sd.getRoot());
}
直接查看服务端是如何实现的
入口org.apache.hadoop.hdfs.qjournal.server.JournalNode.format
方法
/**
* nsInfo通过rpc传输过来的集群信息
*/
void format(NamespaceInfo nsInfo, boolean force) throws IOException {
Preconditions.checkState(nsInfo.getNamespaceID() != 0,
"can't format with uninitialized namespace info: %s",
nsInfo);
LOG.info("Formatting journal id : " + journalId + " with namespace info: " +
nsInfo + " and force: " + force);
// 格式化处理
storage.format(nsInfo, force);
this.cache = createCache();
refreshCachedData();
}
调用org.apache.hadoop.hdfs.qjournal.server.JNStorage.format
方法
void format(NamespaceInfo nsInfo, boolean force) throws IOException {
unlockAll();
try {
sd.analyzeStorage(StartupOption.FORMAT, this, !force);
} finally {
sd.unlock();
}
// 赋值集群信息
setStorageInfo(nsInfo);
LOG.info("Formatting journal {} with nsid: {}", sd, getNamespaceID());
// Unlock the directory before formatting, because we will
// re-analyze it after format(). The analyzeStorage() call
// below is reponsible for re-locking it. This is a no-op
// if the storage is not currently locked.
unlockAll();
// 创建 current 目录,和上面namenode处似曾相识
sd.clearDirectory();
// 写 VERSION信息
writeProperties(sd);
// 创建paxos目录
getOrCreatePaxosDir();
// 分析目录状态。
analyzeStorage();
}
至此,第一阶段的集群基础信息(VERSION,seen_txid等)已经持久化到不同的组件的文件系统中了,剩下就是开始做fsImage的持久化操作了
回到上述 saveFSImageInAllDirs
函数中
入口org.apache.hadoop.hdfs.server.namenode.FSImage.saveFSImageInAllDirs
方法
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
// 记录启动进程
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
// 取消器
if (canceler == null) {
canceler = new Canceler();
}
SaveNamespaceContext ctx = new SaveNamespaceContext(
source, txid, canceler);
try {
List<Thread> saveThreads = new ArrayList<Thread>();
// 保存fsImage到current目录中
// save images into current
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();
}
waitForThreads(saveThreads);
saveThreads.clear();
storage.reportErrorsOnDirectories(ctx.getErrorSDs());
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";
}
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
// Since we now have a new checkpoint, we can clean up some
// old edit logs and checkpoints.
// Do not purge anything if we just wrote a corrupted FsImage.
if (!exitAfterSave.get()) {
purgeOldStorage(nnf);
archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_NEW);
}
} finally {
// Notify any threads waiting on the checkpoint to be canceled
// that it is complete.
ctx.markComplete();
ctx = null;
}
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
saveImage 过程
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
NameNodeFile dstType) throws IOException {
long txid = context.getTxId();
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
conf);
// 获取压缩配置
FSImageCompression compression = FSImageCompression.createCompression(conf);
// 保存
long numErrors = saver.save(newFile, compression);
if (numErrors > 0) {
// The image is likely corrupted.
LOG.error("Detected " + numErrors + " errors while saving FsImage " +
dstFile);
exitAfterSave.set(true);
}
// 保存md5属性文件
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointInfo(txid, Time.now());
}
至此,整个format过程完成。namenode目录文件多了current目录,VERSION/seen_txid文件,fsimage和fsimage_md5文件,高可用ha的情况下journalnode的目录也创建了current目录,多了VERSION文件和paxos等文件
具体内容如下所示:
.
├── journal
│ └── cdp-cluster
│ ├── current
│ │ ├── committed-txid
│ │ ├── paxos
│ │ └── VERSION
│ ├── edits.sync
│ └── in_use.lock
└── namenode
└── current
├── fsimage_0000000000000000000
├── fsimage_0000000000000000000.md5
├── seen_txid
└── VERSION
7 directories, 7 files
希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。