当前位置:   article > 正文

第六章 数据中台PaaS层离线存储之HDFS源码剖析第一部分 - NameNode启动流程&HadoopRpc协议详述_数据中台源代码

数据中台源代码

1、大数据源码解读思路

(1)掌握其网络通信架构

(2)场景驱动方式

  • HDFS

    • namenode datanode启动

    • 写数据得流程

    • 更新原数据流程

    • 读数据流程

2、Hadoop RPC的Demo详述

  • 含义:远程过程调用,即不同进程的方法的调用。

在这里插入图片描述

2.1、创建pom依赖
<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.7.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
2.2、相关源码调试

(1)创建客户端和服务端通信协议接口

/**
* 协议
* @author Administrator
*
*/
public interface ClientProtocol {
 long versionID=1234L;
 void makeDir(String path);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(2)服务端相关源码开发

public class NameNodeRpcServer implements ClientProtocol {
 /**
 * 创建⽬录
 */
 @Override
 public void makeDir(String path) {
 System.out.println("服务端:"+path);
 }
 
 // 构建服务端
 public static void main(String[] args) throws Exception {
     Server server = new RPC.Builder(new Configuration())
     .setBindAddress("localhost")
     .setPort(9999)
     .setProtocol(ClientProtocol.class)
     .setInstance(new NameNodeRpcServer())
     .build();
 
 //启动服务端
 server.start();
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 运行情况:服务端启动会一直等待客户端调用

在这里插入图片描述

  • 查看进程:通过JPS查看进程

在这里插入图片描述

(3)客户端相关源码开发

/**
* RPC客户端
* @author Administrator
*
*/
public class DFSClient {
    public static void main(String[] args) throws IOException {
        ClientProtocol namenode = RPC.getProxy(ClientProtocol.class,
                ClientProtocol.versionID,
                new InetSocketAddress("localhost",9999),
                new Configuration());

        namenode.makeDir("/user/opt/soft");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 查看运行结果:客户端调用服务端方法方法的执行在服务端

在这里插入图片描述

(4)Hadoop RPC特性

①RPC其实指的不同进程之间的调用,例如客户端调用服务端方法,方法的执行在服务端;

②协议接口特征:接口里面必须有VersionID;

③接口协议:服务端会实现接口(协议里面的方法);

hadoop rpc的服务端,我们在JPS时候是可以看得到的。

⑤服务端和客户端创建

  • 服务端:接受参数,创建方法。
  • 客户端:传递参数,调用服务端方法。

3、源码剖析之namenode启动流程

3.1、第一步 - Name注释概述

(1)NameNode服务管理了两个重要的表

  • namespace:管理了文件与block之间的关系,存储到磁盘上。

    • 关系固定,不会发生变化。
  • inodes:管理了Block与主机之间的关系,每次重启重新构建。

    • 关系不固定,档一个节点挂掉,文件块就会挪到另外一台服务器上。

(2)NameNode服务由三个重要类支撑

①NameNode类:管理配置的参数

②NameNode Server

  • IPC Server:开发端口等待别人调用
  • HTTP Server:开放50070界面,可以通过该界面了解HDFS的情况

③FSNameNameSystem:管理了HDFS的元数据

3.2、NameNode的main方法源码剖析
public static void main(String argv[]) throws Exception {
    // 1、解析參數如果异常则退出
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
        System.exit(0);
    }

    try {
        StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
        ///2、创建NameNode
        NameNode namenode = createNameNode(argv, null);
        // 3、如果NameNode不为空则线程阻塞
        if (namenode != null) {
            namenode.join();
        }
    } catch (Throwable e) {
        LOG.error("Failed to start namenode.", e);
        terminate(1, e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
3.3、createNameNode源码剖析
  • 集群启动参数传递:hadoop-daemon.sh start namenode
public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    if (conf == null)
        conf = new HdfsConfiguration();
    // 1、解析参数
    StartupOption startOpt = parseArguments(argv);
    
    // 2、判断传入参数则执行相关枚举操作
    if (startOpt == null) {
        printUsage(System.err);
        return null;
    }
    setStartupOption(conf, startOpt);

    // 3、NameNode初始化方法
    switch (startOpt) {
            ......
                default: {
                    DefaultMetricsSystem.initialize("NameNode");
                    return new NameNode(conf);
                }
    }
}

// NameNode初始化入口
public NameNode(Configuration conf) throws IOException {
    this(conf, NamenodeRole.NAMENODE);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
3.4、NameNode初始化源码剖析
protected NameNode(Configuration conf, NamenodeRole role) 
    try {
        initializeGenericKeys(conf, nsId, namenodeId);
        // 初始化方法
        initialize(conf);
    } catch (IOException e) {
        this.stop();
        throw e;
    } catch (HadoopIllegalArgumentException e) {
        this.stop();
        throw e;
    }
    this.started.set(true);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
3.5、启动代码
protected void initialize(Configuration conf) throws IOException {
	// 1、启动HTTP服务
    if (NamenodeRole.NAMENODE == role) {
        startHttpServer(conf);
    }
    
    this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
	
    // 2、加载元数据
    loadNamesystem(conf);

    // 3、hadoop RPC配置
    // NameNodeRPCServer中有两个主要的RPC服务
    // (1)ClientRPCServer:hdfs客户端去操作HDFS方法
    // (2)ServiceRPCServer:服务之间相互进行方法调用(注册、心跳等)
    rpcServer = createRpcServer(conf);

    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

    startCommonServices(conf);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

(1)查看httpserver相关源码

  • 相关示意图如下

在这里插入图片描述

private void startHttpServer(final Configuration conf) throws IOException {
    httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
    httpServer.start();
    httpServer.setStartupProgress(startupProgress);
}

// 向上追溯
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
    InetSocketAddress bindAddress = getHttpServerAddress(conf);
}

//向上追溯
protected InetSocketAddress getHttpServerAddress(Configuration conf) {
    return getHttpAddress(conf);
}

//向上追溯
public static InetSocketAddress getHttpAddress(Configuration conf) {
    return  NetUtils.createSocketAddr(
        conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}
// 查看相关参数
public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
public static final String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • httpserver启动方法查看
private void startHttpServer(final Configuration conf) throws IOException {
    httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
    // 启动httpserver服务
    httpServer.start();
    
}

// 1、JDK中本来有httpserver服务,hadoop自己封装了一个httpserver2服务,便于hadoop自己使用
void start() throws IOException {
    HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);

    HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
                                                                       httpAddr, httpsAddr, "hdfs",
                                                                       			DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
                                                                       DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);

    httpServer = builder.build();

    // 1.1、进行httpserver相关服务配置
    httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
    httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
    // 1.2、设置Servlets
    setupServlets(httpServer, conf);
    httpServer.start();
    //  1.3、启动httpServer服务,对外开放50070端口
    httpServer.start();
}

// 2、servlet相关配置,根据不同的servlet进行相关功能操作。
private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
    httpServer.addInternalServlet("startupProgress",
                                  StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
    ...
    httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
                                  ContentSummaryServlet.class, false);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

(2)加载元数据

  • 相关示意图如下:

在这里插入图片描述

// 1、从磁盘上加载元数据文件
protected void loadNamesystem(Configuration conf) throws IOException {
    this.namesystem = FSNamesystem.loadFromDisk(conf);
}

// 2、加载元数据源码分析
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {

    checkConfiguration(conf);
    // 2.1、新建FSImage文件
    FSImage fsImage = new FSImage(conf,
                                  FSNamesystem.getNamespaceDirs(conf),
                                  FSNamesystem.getNamespaceEditsDirs(conf));
    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
    StartupOption startOpt = NameNode.getStartupOption(conf);


    long loadStart = monotonicNow();
    try {
        // 2.2、加载元数据
        namesystem.loadFSImage(startOpt);
    } catch (IOException ioe) {
        LOG.warn("Encountered exception loading fsimage", ioe);
        fsImage.close();
        throw ioe;
    }
}

// 3、加载FSImage文件
private void loadFSImage(StartupOption startOpt) throws IOException {
    final FSImage fsImage = getFSImage();
    try {
        // We shouldn't be calling saveNamespace if we've come up in standby state.
        MetaRecoveryContext recovery = startOpt.createRecoveryContext();
        // 3.1、合并元数据 : (fsimage + editlog = new FSImage)
        final boolean staleImage
            = fsImage.recoverTransitionRead(startOpt, this, recovery);
        if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
            RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
            rollingUpgradeInfo = null;
        }
        final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); 
        LOG.info("Need to save fs image? " + needToSave
                 + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
                 + ", isRollingUpgrade=" + isRollingUpgrade() + ")");
        if (needToSave) {
            // 3.2、将合并后的FSImage写到磁盘文件上
            fsImage.saveNamespace(this);
        } else {
            updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),
                                                  startOpt);
            // No need to save, so mark the phase done.
            StartupProgress prog = NameNode.getStartupProgress();
            prog.beginPhase(Phase.SAVING_CHECKPOINT);
            prog.endPhase(Phase.SAVING_CHECKPOINT);
        }
        // This will start a new log segment and write to the seen_txid file, so
        // we shouldn't do it when coming up in standby state
        if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
            || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
            // 3.3、打开一个新的EditLog文件写入数据
            fsImage.openEditLogForWrite();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

(3)Hadoop RPC相关操作

  • 相关示意图如下
    • ClientRPCServer:hdfs客户端去操作HDFS方法
    • ServiceRPCServer:NameNode和DataNode服务之间相互进行方法调用(注册、心跳等)

在这里插入图片描述

// 1、创建NameNodeRpcServer服务
protected NameNodeRpcServer createRpcServer(Configuration conf)
    throws IOException {
    return new NameNodeRpcServer(conf, this);
}

// 2、启动serviceRpcServer服务,用来监听DataNode发送来的请求
class NameNodeRpcServer implements NamenodeProtocols {
      this.serviceRpcServer = new RPC.Builder(conf)
          .setProtocol(
              org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
          .setInstance(clientNNPbService)
          .setBindAddress(bindHost)
          .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();
}

// 3、接口继承相关协议,namenode中的不同方法对应给不同的协议
public interface NamenodeProtocols
  extends ClientProtocol,
          DatanodeProtocol,
          NamenodeProtocol,
          RefreshAuthorizationPolicyProtocol,
          RefreshUserMappingsProtocol,
          RefreshCallQueueProtocol,
          GenericRefreshProtocol,
          GetUserMappingsProtocol,
          HAServiceProtocol,
          TraceAdminProtocol {
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 协议的方法实现:
// 1、查看接口相关方法
public interface ClientProtocol {
    public boolean mkdirs(String src, FsPermission masked, boolean createParent)
        throws AccessControlException, FileAlreadyExistsException,
    FileNotFoundException, NSQuotaExceededException,
    ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
    SnapshotAccessControlException, IOException;
}


// 2、真正创建mkdirs方法是RPC的服务端
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
    throws IOException {
    checkNNStartup();
    if(stateChangeLog.isDebugEnabled()) {
        stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
    }
    if (!checkPathLength(src)) {
        throw new IOException("mkdirs: Pathname too long.  Limit " 
                              + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    return namesystem.mkdirs(src,
                             new PermissionStatus(getRemoteUser().getShortUserName(),
                                                  null, masked), createParent);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/375119
推荐阅读
相关标签
  

闽ICP备14008679号