赞
踩
(1)掌握其网络通信架构
(2)场景驱动方式
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
(1)创建客户端和服务端通信协议接口
/**
* 协议
* @author Administrator
*
*/
public interface ClientProtocol {
long versionID=1234L;
void makeDir(String path);
}
(2)服务端相关源码开发
public class NameNodeRpcServer implements ClientProtocol { /** * 创建⽬录 */ @Override public void makeDir(String path) { System.out.println("服务端:"+path); } // 构建服务端 public static void main(String[] args) throws Exception { Server server = new RPC.Builder(new Configuration()) .setBindAddress("localhost") .setPort(9999) .setProtocol(ClientProtocol.class) .setInstance(new NameNodeRpcServer()) .build(); //启动服务端 server.start(); } }
(3)客户端相关源码开发
/**
* RPC客户端
* @author Administrator
*
*/
public class DFSClient {
public static void main(String[] args) throws IOException {
ClientProtocol namenode = RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID,
new InetSocketAddress("localhost",9999),
new Configuration());
namenode.makeDir("/user/opt/soft");
}
}
(4)Hadoop RPC特性
①RPC其实指的不同进程之间的调用,例如客户端调用服务端方法,方法的执行在服务端;
②协议接口特征:接口里面必须有VersionID;
③接口协议:服务端会实现接口(协议里面的方法);
④hadoop rpc的服务端,我们在JPS时候是可以看得到的。
⑤服务端和客户端创建
(1)NameNode服务管理了两个重要的表
namespace:管理了文件与block之间的关系,存储到磁盘上。
inodes:管理了Block与主机之间的关系,每次重启重新构建。
(2)NameNode服务由三个重要类支撑
①NameNode类:管理配置的参数
②NameNode Server
③FSNameNameSystem:管理了HDFS的元数据
public static void main(String argv[]) throws Exception { // 1、解析參數如果异常则退出 if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) { System.exit(0); } try { StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); ///2、创建NameNode NameNode namenode = createNameNode(argv, null); // 3、如果NameNode不为空则线程阻塞 if (namenode != null) { namenode.join(); } } catch (Throwable e) { LOG.error("Failed to start namenode.", e); terminate(1, e); } }
public static NameNode createNameNode(String argv[], Configuration conf) throws IOException { LOG.info("createNameNode " + Arrays.asList(argv)); if (conf == null) conf = new HdfsConfiguration(); // 1、解析参数 StartupOption startOpt = parseArguments(argv); // 2、判断传入参数则执行相关枚举操作 if (startOpt == null) { printUsage(System.err); return null; } setStartupOption(conf, startOpt); // 3、NameNode初始化方法 switch (startOpt) { ...... default: { DefaultMetricsSystem.initialize("NameNode"); return new NameNode(conf); } } } // NameNode初始化入口 public NameNode(Configuration conf) throws IOException { this(conf, NamenodeRole.NAMENODE); }
protected NameNode(Configuration conf, NamenodeRole role)
try {
initializeGenericKeys(conf, nsId, namenodeId);
// 初始化方法
initialize(conf);
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
}
this.started.set(true);
}
protected void initialize(Configuration conf) throws IOException { // 1、启动HTTP服务 if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); } this.spanReceiverHost = SpanReceiverHost.getInstance(conf); // 2、加载元数据 loadNamesystem(conf); // 3、hadoop RPC配置 // NameNodeRPCServer中有两个主要的RPC服务 // (1)ClientRPCServer:hdfs客户端去操作HDFS方法 // (2)ServiceRPCServer:服务之间相互进行方法调用(注册、心跳等) rpcServer = createRpcServer(conf); pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); startCommonServices(conf); }
(1)查看httpserver相关源码
private void startHttpServer(final Configuration conf) throws IOException { httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); httpServer.start(); httpServer.setStartupProgress(startupProgress); } // 向上追溯 protected InetSocketAddress getHttpServerBindAddress(Configuration conf) { InetSocketAddress bindAddress = getHttpServerAddress(conf); } //向上追溯 protected InetSocketAddress getHttpServerAddress(Configuration conf) { return getHttpAddress(conf); } //向上追溯 public static InetSocketAddress getHttpAddress(Configuration conf) { return NetUtils.createSocketAddr( conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT)); } // 查看相关参数 public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
private void startHttpServer(final Configuration conf) throws IOException { httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); // 启动httpserver服务 httpServer.start(); } // 1、JDK中本来有httpserver服务,hadoop自己封装了一个httpserver2服务,便于hadoop自己使用 void start() throws IOException { HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "hdfs", DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); httpServer = builder.build(); // 1.1、进行httpserver相关服务配置 httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); // 1.2、设置Servlets setupServlets(httpServer, conf); httpServer.start(); // 1.3、启动httpServer服务,对外开放50070端口 httpServer.start(); } // 2、servlet相关配置,根据不同的servlet进行相关功能操作。 private static void setupServlets(HttpServer2 httpServer, Configuration conf) { httpServer.addInternalServlet("startupProgress", StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class); ... httpServer.addInternalServlet("contentSummary", "/contentSummary/*", ContentSummaryServlet.class, false); }
(2)加载元数据
// 1、从磁盘上加载元数据文件 protected void loadNamesystem(Configuration conf) throws IOException { this.namesystem = FSNamesystem.loadFromDisk(conf); } // 2、加载元数据源码分析 static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); // 2.1、新建FSImage文件 FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); long loadStart = monotonicNow(); try { // 2.2、加载元数据 namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } } // 3、加载FSImage文件 private void loadFSImage(StartupOption startOpt) throws IOException { final FSImage fsImage = getFSImage(); try { // We shouldn't be calling saveNamespace if we've come up in standby state. MetaRecoveryContext recovery = startOpt.createRecoveryContext(); // 3.1、合并元数据 : (fsimage + editlog = new FSImage) final boolean staleImage = fsImage.recoverTransitionRead(startOpt, this, recovery); if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) || RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) { rollingUpgradeInfo = null; } final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); LOG.info("Need to save fs image? " + needToSave + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled + ", isRollingUpgrade=" + isRollingUpgrade() + ")"); if (needToSave) { // 3.2、将合并后的FSImage写到磁盘文件上 fsImage.saveNamespace(this); } else { updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(), startOpt); // No need to save, so mark the phase done. StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAVING_CHECKPOINT); prog.endPhase(Phase.SAVING_CHECKPOINT); } // This will start a new log segment and write to the seen_txid file, so // we shouldn't do it when coming up in standby state if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE) || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) { // 3.3、打开一个新的EditLog文件写入数据 fsImage.openEditLogForWrite(); } }
(3)Hadoop RPC相关操作
// 1、创建NameNodeRpcServer服务 protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { return new NameNodeRpcServer(conf, this); } // 2、启动serviceRpcServer服务,用来监听DataNode发送来的请求 class NameNodeRpcServer implements NamenodeProtocols { this.serviceRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); } // 3、接口继承相关协议,namenode中的不同方法对应给不同的协议 public interface NamenodeProtocols extends ClientProtocol, DatanodeProtocol, NamenodeProtocol, RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol, RefreshCallQueueProtocol, GenericRefreshProtocol, GetUserMappingsProtocol, HAServiceProtocol, TraceAdminProtocol { }
// 1、查看接口相关方法 public interface ClientProtocol { public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, SnapshotAccessControlException, IOException; } // 2、真正创建mkdirs方法是RPC的服务端 public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { checkNNStartup(); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); } if (!checkPathLength(src)) { throw new IOException("mkdirs: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } return namesystem.mkdirs(src, new PermissionStatus(getRemoteUser().getShortUserName(), null, masked), createParent); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。