赞
踩
- //ResourceManager.java
- protected void serviceStart() throws Exception {
- ......
- super.serviceStart();
- }
2、父类CompositeService.serviceStart
- protected void serviceStart() throws Exception {
- //获得服务列表
- List<Service> services = getServices();
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": starting services, size=" + services.size());
- }
- //循环启动服务,每一次start调用最终都会进入服务本身的serviceStart函数
- for (Service service : services) {
- // start the service. If this fails that service
- // will be stopped and an exception raised
- service.start();
- }
- super.serviceStart();
- }
3、进入父类的父类AbstractService
- public void start() {
- //服务是否已经启动?
- if (isInState(STATE.STARTED)) {
- return;
- }
- //enter the started state
- synchronized (stateChangeLock) {
- if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
- try {
- //记录服务启动时间
- startTime = System.currentTimeMillis();
- //开始启动服务,此处会进入子类的函数+++
- serviceStart();
- //检测服务是否启动成功
- if (isInState(STATE.STARTED)) {
- //if the service started (and isn't now in a later state), notify
- if (LOG.isDebugEnabled()) {
- LOG.debug("Service " + getName() + " is started");
- }
- notifyListeners();
- }
- } catch (Exception e) {
- noteFailure(e);
- ServiceOperations.stopQuietly(LOG, this);
- throw ServiceStateException.convert(e);
- }
- }
- }
- }
4、最终进入子类serviceStart函数中启动服务- @Override
- public void serviceStart() throws Exception {
- //Token管理器启动,具体作用以后分析,每个管理器由Timer驱动
- amRmTokenSecretManager.start();
- containerTokenSecretManager.start();
- nmTokenSecretManager.start();
-
-
- try {
- //过期Token移除线程
- rmDTSecretManager.startThreads();
- } catch(IOException ie) {
- throw new YarnRuntimeException("Failed to start secret manager threads", ie);
- }
- super.serviceStart();
- }
Ping Checker服务:AbstractLivelinessMonitor的内部类,循环遍历已记录的NodeManager列表,当发现某个节点超过一段时间未汇报,则认为他已经挂掉,在列表中删除。
- private class PingChecker implements Runnable {
- @Override
- public void run() {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- synchronized (AbstractLivelinessMonitor.this) {
- //获得活动NM列表的迭代器
- Iterator<Map.Entry<O, Long>> iterator =
- running.entrySet().iterator();
-
-
- //avoid calculating current time everytime in loop
- long currentTime = clock.getTime();
- //迭代每个节点,若发现节点超过expireInterval(yarn.nm.liveness-monitor.expiry-interval-ms控制,默认10分钟)
- //则认为他已经挂掉,删除该节点
- while (iterator.hasNext()) {
- Map.Entry<O, Long> entry = iterator.next();
- if (currentTime > entry.getValue() + expireInterval) {
- iterator.remove();
- expire(entry.getKey());
- LOG.info("Expired:" + entry.getKey().toString() +
- " Timed out after " + expireInterval/1000 + " secs");
- }
- }
- }
- try {
- //线程暂停monitorInterval( expireInterval/3)
- Thread.sleep(monitorInterval);
- } catch (InterruptedException e) {
- LOG.info(getName() + " thread interrupted");
- break;
- }
- }
- }
- }
ResourceManager Event Processor服务:
- private final class EventProcessor implements Runnable {
- @Override
- public void run() {
-
-
- SchedulerEvent event;
-
-
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- try {
- //取出事件
- event = eventQueue.take();
- } catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
- return; // TODO: Kill RM.
- }
-
-
- try {
- //处理事件
- scheduler.handle(event);
- } catch (Throwable t) {
- .....
- }
- }
- }
- }
ResourceTrackerService服务:RPC服务器,实现了ResourceTracker接口,提供NM的注册和心跳服务
- //ResourceTrackerService.java
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- // ResourceTrackerServer authenticates NodeManager via Kerberos if
- // security is enabled, so no secretManager.
- //创建RPC服务器,该服务器实现ResourceTracker接口,handler数量由yarn.resourcemanager.
- resource-tracker.client.thread-count控制
- Configuration conf = getConfig();
- YarnRPC rpc = YarnRPC.create(conf);
- this.server =
- rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
- conf, null,
- conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
-
- // Enable service authorization?
- if (conf.getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
- false)) {
- refreshServiceAcls(conf, new RMPolicyProvider());
- }
- //服务启动
- this.server.start();
- conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
- server.getListenerAddress());
- }
RPC服务器组件启动,主要包括responder、listener、handler
- public synchronized void start() {
- responder.start();
- listener.start();
- handlers = new Handler[handlerCount];
-
- for (int i = 0; i < handlerCount; i++) {
- handlers[i] = new Handler(i);
- handlers[i].start();
- }
- }
服务于客户端的RPC server:ClientRMService,类似ResourceTrackerService,该服务器实现了ApplicationClientProtocol接口,RPC server的启动都一样,只是实现的协议不同
- @Override
- protected void serviceStart() throws Exception {
- Configuration conf = getConfig();
- YarnRPC rpc = YarnRPC.create(conf);
- //handler数量由yarn.resourcemanager.client.thread-count控制
- this.server =
- rpc.getServer(ApplicationClientProtocol.class, this,
- clientBindAddress,
- conf, this.rmDTSecretManager,
- conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
-
- // Enable service authorization?
- if (conf.getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
- false)) {
- refreshServiceAcls(conf, new RMPolicyProvider());
- }
-
- this.server.start();
- clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
- server.getListenerAddress());
- super.serviceStart();
- }
服务于管理员的RPC server:AdminService ,handler数量由yarn.resourcemanager.admin.client.thread-count控制,该服务实现ResourceManagerAdministrationProtocol接口
- protected void startServer() throws Exception {
- Configuration conf = getConfig();
- YarnRPC rpc = YarnRPC.create(conf);
- this.server = (Server) rpc.getServer(
- ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
- conf, null,
- conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
- .......
- this.server.start();
- conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
- server.getListenerAddress());
- }
AsyncDispatcher event handler服务的启动:- @Override
- protected void serviceStart() throws Exception {
- //调用父类同名函数,实际啥都木有做,以后全局初始化之类的操作可能会放进去
- super.serviceStart();
- //创建一个新的线程,并启动,主要的业务关系包含在createThread函数中
- eventHandlingThread = new Thread(createThread());
- eventHandlingThread.setName("AsyncDispatcher event handler");
- eventHandlingThread.start();
- }
下面看AsyncDispatcher的线程执行体,由上面的createThread创建,该线程会进入主循环,并一直等待事件队列,一旦有新的事件到达,便执行dispatch(event),将事件分发出去
- Runnable createThread() {
- return new Runnable() {
- @Override
- public void run() {
- //查看服务标识和线程状态
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- drained = eventQueue.isEmpty();
- // blockNewEvents is only set when dispatcher is draining to stop,
- // adding this check is to avoid the overhead of acquiring the lock
- // and calling notify every time in the normal run of the loop.
- //加入该检测是防止事件过多导致该线程压力过大
- if (blockNewEvents) {
- synchronized (waitForDrained) {
- if (drained) {
- waitForDrained.notify();
- }
- }
- }
- Event event;
- try {
- //在队列中取出事件
- event = eventQueue.take();
- } catch(InterruptedException ie) {
- if (!stopped) {
- LOG.warn("AsyncDispatcher thread interrupted", ie);
- }
- return;
- }
- if (event != null) {
- //分发事件
- dispatch(event);
- }
- }
- }
- };
- }
RM的服务类型还是比较多的,而且好多服务都是多线程的,比如RPCserver,默认的handler就有50个,而且有多个RPC server,RM中整体的服务列表服下:
- Service org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService
- Service org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer
- Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
- Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
- Service org.apache.hadoop.yarn.server.resourcemanager.NodesListManager
- Service org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher
- Service NMLivelinessMonitor in state NMLivelinessMonitor: INITED
- Service org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService
- Service org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
- Service org.apache.hadoop.yarn.server.resourcemanager.ClientRMService
- Service org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher
- Service Dispatcher in state Dispatcher: INITED
- Service org.apache.hadoop.yarn.server.resourcemanager.AdminService
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。