当前位置:   article > 正文

Hadoop-Yarn-启动篇_error: refusing to run as root: roott account is n

error: refusing to run as root: roott account is not found. aborting.

  一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、脚本部分

1、start-yarn.sh

如果我们想单独启动Yarn会用到$HADOOP_HOME/sbin/start-yarn.sh,下面我们就看看start-yarn.sh的内容

  1. #!/usr/bin/env bash
  2. ## @description usage info
  3. ## @audience private
  4. ## @stability evolving
  5. ## @replaceable no
  6. function hadoop_usage
  7. {
  8. hadoop_generate_usage "${MYNAME}" false
  9. }
  10. #获取当前文件的文件名
  11. MYNAME="${BASH_SOURCE-$0}"
  12. bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
  13. #找到 libexec
  14. if [[ -n "${HADOOP_HOME}" ]]; then
  15. HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
  16. else
  17. HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
  18. fi
  19. HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
  20. # shellcheck disable=SC2034
  21. # shellcheck扫描出的每一个错误都会有一个编号,以SC+4位数字组成
  22. # SC2034表示:变量赋值后未被使用
  23. HADOOP_NEW_CONFIG=true
  24. if [[ -f "${HADOOP_LIBEXEC_DIR}/yarn-config.sh" ]]; then
  25. . "${HADOOP_LIBEXEC_DIR}/yarn-config.sh"
  26. else
  27. echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/yarn-config.sh." 2>&1
  28. exit 1
  29. fi
  30. HADOOP_JUMBO_RETCOUNTER=0
  31. # start resourceManager
  32. # 判断集群中的yarn是否开启了HA,这里我们先看不开启HA的部分
  33. HARM=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.enabled 2>&-)
  34. if [[ ${HARM} = "false" ]]; then
  35. echo "Starting resourcemanager"
  36. #到这里会不会想 hadoop_uservar_su 是个什么东西?
  37. #hadoop_uservar_su其实是hadoop源码中common 模块中的hadoop-functions.sh中定义的一个函数
  38. #以下还有很多类似的函数,基本都在这里面定义,下面让我们看看hadoop-functions.sh
  39. hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \
  40. --config "${HADOOP_CONF_DIR}" \
  41. --daemon start \
  42. resourcemanager
  43. (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
  44. else
  45. logicals=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.rm-ids 2>&-)
  46. logicals=${logicals//,/ }
  47. for id in ${logicals}
  48. do
  49. rmhost=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey "yarn.resourcemanager.hostname.${id}" 2>&-)
  50. RMHOSTS="${RMHOSTS} ${rmhost}"
  51. done
  52. echo "Starting resourcemanagers on [${RMHOSTS}]"
  53. hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \
  54. --config "${HADOOP_CONF_DIR}" \
  55. --daemon start \
  56. --workers \
  57. --hostnames "${RMHOSTS}" \
  58. resourcemanager
  59. (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
  60. fi
  61. # start nodemanager
  62. echo "Starting nodemanagers"
  63. hadoop_uservar_su yarn nodemanager "${HADOOP_YARN_HOME}/bin/yarn" \
  64. --config "${HADOOP_CONF_DIR}" \
  65. --workers \
  66. --daemon start \
  67. nodemanager
  68. (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
  69. # start proxyserver
  70. PROXYSERVER=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.web-proxy.address 2>&- | cut -f1 -d:)
  71. if [[ -n ${PROXYSERVER} ]]; then
  72. hadoop_uservar_su yarn proxyserver "${HADOOP_YARN_HOME}/bin/yarn" \
  73. --config "${HADOOP_CONF_DIR}" \
  74. --workers \
  75. --hostnames "${PROXYSERVER}" \
  76. --daemon start \
  77. proxyserver
  78. (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
  79. fi
  80. exit ${HADOOP_JUMBO_RETCOUNTER}

整体看start-yarn.sh可以看出,总共需要启动三个角色即:resourceManager、nodemanager、proxyserver。我们先看resourceManager

启动resourceManager会判断Yarn是否开启了HA,这里我们只分析没有开启HA的情况

2、hadoop-functions.sh

hadoop-functions.sh总共有2700多行,这里只列举Yarn启动期间涉及的函数,有兴趣的同学可以自行阅读。

  1. #!/usr/bin/env bash
  2. ## @description 当以root身份运行时,通过su执行命令,并额外支持可能以root身份合法启动的命令
  3. ## @description (例如,datanode)(这将由start-*/stop-*脚本使用。)
  4. ## @audience private
  5. ## @stability evolving
  6. ## @replaceable no
  7. ## @param user
  8. ## @param commandstring
  9. ## @return exitstatus
  10. function hadoop_uservar_su
  11. {
  12. declare program=$1
  13. declare command=$2
  14. shift 2
  15. declare uprogram
  16. declare ucommand
  17. declare uvar
  18. declare svar
  19. if hadoop_privilege_check; then
  20. uvar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" USER)
  21. svar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" SECURE_USER)
  22. if [[ -n "${!uvar}" ]]; then
  23. hadoop_su "${!uvar}" "$@"
  24. elif [[ -n "${!svar}" ]]; then
  25. ## 如果我们在这里,那么没有定义USERSECURE_USER我们已经有特权了,
  26. ## 所以只需运行该命令,并希望一切顺利
  27. "$@"
  28. else
  29. hadoop_error "ERROR: Attempting to operate on ${program} ${command} as root"
  30. hadoop_error "ERROR: but there is no ${uvar} defined. Aborting operation."
  31. return 1
  32. fi
  33. else
  34. "$@"
  35. fi
  36. }
  37. ## @description 生成自定义子命令var
  38. ## @audience public
  39. ## @stability stable
  40. ## @replaceable yes
  41. ## @param command
  42. ## @param subcommand
  43. ## @param customid
  44. ## @return string
  45. function hadoop_build_custom_subcmd_var
  46. {
  47. declare program=$1
  48. declare command=$2
  49. declare custom=$3
  50. declare uprogram
  51. declare ucommand
  52. if [[ -z "${BASH_VERSINFO[0]}" ]] \
  53. || [[ "${BASH_VERSINFO[0]}" -lt 4 ]]; then
  54. uprogram=$(echo "${program}" | tr '[:lower:]' '[:upper:]')
  55. ucommand=$(echo "${command}" | tr '[:lower:]' '[:upper:]')
  56. else
  57. uprogram=${program^^}
  58. ucommand=${command^^}
  59. fi
  60. echo "${uprogram}_${ucommand}_${custom}"
  61. }
  62. ## @description 如果找到给定的用户,则在以root身份运行时通过su执行命令;
  63. ## @description 如果没有找到,则退出并失败。
  64. ## @description 否则只需运行它。(这将由start-*/stop-*脚本使用。)
  65. ## @audience private
  66. ## @stability evolving
  67. ## @replaceable yes
  68. ## @param user
  69. ## @param commandstring
  70. ## @return exitstatus
  71. function hadoop_su
  72. {
  73. declare user=$1
  74. shift
  75. if hadoop_privilege_check; then
  76. if hadoop_verify_user_resolves user; then
  77. su -l "${user}" -- "$@"
  78. else
  79. hadoop_error "ERROR: Refusing to run as root: ${user} account is not found. Aborting."
  80. return 1
  81. fi
  82. else
  83. "$@"
  84. fi
  85. }
  86. ## @description 给定一个文件名或目录,返回它的绝对版本
  87. ## @description 这是readlink的替代品,它是不可移植的
  88. ## @audience public
  89. ## @stability stable
  90. ## @param fsobj
  91. ## @replaceable no
  92. ## @return 0 success
  93. ## @return 1 failure
  94. ## @return stdout abspath
  95. function hadoop_abs
  96. {
  97. declare obj=$1
  98. declare dir
  99. declare fn
  100. declare dirret
  101. if [[ ! -e ${obj} ]]; then
  102. return 1
  103. elif [[ -d ${obj} ]]; then
  104. dir=${obj}
  105. else
  106. dir=$(dirname -- "${obj}")
  107. fn=$(basename -- "${obj}")
  108. fn="/${fn}"
  109. fi
  110. dir=$(cd -P -- "${dir}" >/dev/null 2>/dev/null && pwd -P)
  111. dirret=$?
  112. if [[ ${dirret} = 0 ]]; then
  113. echo "${dir}${fn}"
  114. return 0
  115. fi
  116. return 1
  117. }
  118. ## @description 验证是否允许${USER}执行给定的子命令。
  119. ## @audience public
  120. ## @stability stable
  121. ## @replaceable yes
  122. ## @param subcommand
  123. ## @return 1 on no re-exec needed
  124. ## @return 0 on need to re-exec
  125. function hadoop_need_reexec
  126. {
  127. declare program=$1
  128. declare command=$2
  129. declare uvar
  130. #默认false
  131. if [[ "${HADOOP_REEXECED_CMD}" = true ]]; then
  132. return 1
  133. fi
  134. # if we have privilege, and the _USER is defined, and _USER is
  135. # set to someone who isn't us, then yes, we should re-exec.
  136. # otherwise no, don't re-exec and let the system deal with it.
  137. if hadoop_privilege_check; then
  138. uvar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" USER)
  139. if [[ -n ${!uvar} ]]; then
  140. if [[ ${!uvar} != "${USER}" ]]; then
  141. return 0
  142. fi
  143. fi
  144. fi
  145. return 1
  146. }
  147. ## @description 验证是否允许${USER}执行给定的子命令
  148. ## @audience public
  149. ## @stability stable
  150. ## @replaceable yes
  151. ## @param command
  152. ## @param subcommand
  153. ## @return return 0 on success
  154. ## @return exit 1 on failure
  155. function hadoop_verify_user_perm
  156. {
  157. declare program=$1
  158. declare command=$2
  159. declare uvar
  160. uvar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" USER)
  161. if [[ -n ${!uvar} ]]; then
  162. if [[ ${!uvar} != "${USER}" ]]; then
  163. hadoop_error "ERROR: ${command} can only be executed by ${!uvar}."
  164. exit 1
  165. fi
  166. fi
  167. return 0
  168. }
  169. ## @description 如果HADOOP_SUBCMD_SUPPORTDAEMONIZATIONfalse
  170. ## @description 则将HADOOP_CLIENT_OPTS变量添加到HADOOP_OPTS
  171. ## @audience public
  172. ## @stability stable
  173. ## @replaceable yes
  174. function hadoop_add_client_opts
  175. {
  176. if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = false
  177. || -z "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" ]]; then
  178. hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
  179. HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
  180. fi
  181. }
  182. ## @description 要处理的实用程序例程--工人模式
  183. ## @audience private
  184. ## @stability evolving
  185. ## @replaceable yes
  186. ## @param commandarray
  187. function hadoop_common_worker_mode_execute
  188. {
  189. #
  190. # 输入应该是用户以数组形式给出的命令行
  191. #
  192. local argv=("$@")
  193. # 如果--workers仍在命令行上,请将其删除以防止循环。
  194. # 还可以删除--hostname和--hosts以及arg值
  195. local argsSize=${#argv[@]};
  196. for (( i = 0; i < argsSize; i++ ))
  197. do
  198. if [[ "${argv[$i]}" =~ ^--workers$ ]]; then
  199. unset argv[$i]
  200. elif [[ "${argv[$i]}" =~ ^--hostnames$ ]] ||
  201. [[ "${argv[$i]}" =~ ^--hosts$ ]]; then
  202. unset argv[$i];
  203. let i++;
  204. unset argv[$i];
  205. fi
  206. done
  207. if [[ ${QATESTMODE} = true ]]; then
  208. echo "${argv[@]}"
  209. return
  210. fi
  211. hadoop_connect_to_hosts -- "${argv[@]}"
  212. }
  213. ## @description 连接到${HADOOP_WORKERS}或${HADOOP_WORKER_NAMES}并执行命令。
  214. ## @audience private
  215. ## @stability evolving
  216. ## @replaceable yes
  217. ## @param command
  218. ## @param [...]
  219. function hadoop_connect_to_hosts
  220. {
  221. # shellcheck disable=SC2124
  222. local params="$@"
  223. local worker_file
  224. local tmpslvnames
  225. #
  226. # ssh (or whatever) to a host
  227. #
  228. # 用户可以指定主机名或主机名所在的文件(不能同时指定两者)
  229. if [[ -n "${HADOOP_WORKERS}" && -n "${HADOOP_WORKER_NAMES}" ]] ; then
  230. hadoop_error "ERROR: Both HADOOP_WORKERS and HADOOP_WORKER_NAMES were defined. Aborting."
  231. exit 1
  232. elif [[ -z "${HADOOP_WORKER_NAMES}" ]]; then
  233. if [[ -n "${HADOOP_WORKERS}" ]]; then
  234. worker_file=${HADOOP_WORKERS}
  235. elif [[ -f "${HADOOP_CONF_DIR}/workers" ]]; then
  236. worker_file=${HADOOP_CONF_DIR}/workers
  237. elif [[ -f "${HADOOP_CONF_DIR}/slaves" ]]; then
  238. hadoop_error "WARNING: 'slaves' file has been deprecated. Please use 'workers' file instead."
  239. worker_file=${HADOOP_CONF_DIR}/slaves
  240. fi
  241. fi
  242. # 如果pdsh可用,让我们使用它。否则默认为ssh循环。(啊)
  243. if [[ -e '/usr/bin/pdsh' ]]; then
  244. if [[ -z "${HADOOP_WORKER_NAMES}" ]] ; then
  245. # 如果给了我们一个文件,就让pdsh来处理它
  246. # shellcheck disable=SC2086
  247. PDSH_SSH_ARGS_APPEND="${HADOOP_SSH_OPTS}" pdsh \
  248. -f "${HADOOP_SSH_PARALLEL}" -w ^"${worker_file}" $"${@// /\\ }" 2>&1
  249. else
  250. # pdsh-arg主机列表中不允许有空格
  251. # shellcheck disable=SC2086
  252. tmpslvnames=$(echo ${HADOOP_WORKER_NAMES} | tr -s ' ' ,)
  253. PDSH_SSH_ARGS_APPEND="${HADOOP_SSH_OPTS}" pdsh \
  254. -f "${HADOOP_SSH_PARALLEL}" \
  255. -w "${tmpslvnames}" $"${@// /\\ }" 2>&1
  256. fi
  257. else
  258. if [[ -z "${HADOOP_WORKER_NAMES}" ]]; then
  259. HADOOP_WORKER_NAMES=$(sed 's/#.*$//;/^$/d' "${worker_file}")
  260. fi
  261. hadoop_connect_to_hosts_without_pdsh "${params}"
  262. fi
  263. }
  264. ## @description 连接到${HADOOP_WORKER_NAMES}并在不支持pdsh的环境下执行命令
  265. ## @audience private
  266. ## @stability evolving
  267. ## @replaceable yes
  268. ## @param command
  269. ## @param [...]
  270. function hadoop_connect_to_hosts_without_pdsh
  271. {
  272. # shellcheck disable=SC2124
  273. local params="$@"
  274. local workers=(${HADOOP_WORKER_NAMES})
  275. for (( i = 0; i < ${#workers[@]}; i++ ))
  276. do
  277. if (( i != 0 && i % HADOOP_SSH_PARALLEL == 0 )); then
  278. wait
  279. fi
  280. # shellcheck disable=SC2086
  281. hadoop_actual_ssh "${workers[$i]}" ${params} &
  282. done
  283. wait
  284. }
  285. ## @description 通过ssh,登录“hostname”并运行“command”`
  286. ## @audience private
  287. ## @stability evolving
  288. ## @replaceable yes
  289. ## @param hostname
  290. ## @param command
  291. ## @param [...]
  292. function hadoop_actual_ssh
  293. {
  294. # 我们将此函数传递给xargs,应获取hostname,然后是命令行的其余部分
  295. local worker=$1
  296. shift
  297. # shellcheck disable=SC2086
  298. ssh ${HADOOP_SSH_OPTS} ${worker} $"${@// /\\ }" 2>&1 | sed "s/^/$worker: /"
  299. }
  300. ## @description 通用shell脚本opton解析器。将HADOOP_PARSE_COUNTER设置为调用方应移位的数字
  301. ## @audience private
  302. ## @stability evolving
  303. ## @replaceable yes
  304. ## @param [parameters, typically "$@"]
  305. function hadoop_parse_args
  306. {
  307. HADOOP_DAEMON_MODE="default"
  308. HADOOP_PARSE_COUNTER=0
  309. # 并非所有命令都支持此处支持的所有选项,这些选项是:
  310. hadoop_add_option "--config dir" "Hadoop config directory"
  311. hadoop_add_option "--debug" "turn on shell script debug mode"
  312. hadoop_add_option "--help" "usage information"
  313. while true; do
  314. hadoop_debug "hadoop_parse_args: processing $1"
  315. case $1 in
  316. --buildpaths)
  317. HADOOP_ENABLE_BUILD_PATHS=true
  318. shift
  319. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+1))
  320. ;;
  321. --config)
  322. shift
  323. confdir=$1
  324. shift
  325. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+2))
  326. if [[ -d "${confdir}" ]]; then
  327. HADOOP_CONF_DIR="${confdir}"
  328. elif [[ -z "${confdir}" ]]; then
  329. hadoop_error "ERROR: No parameter provided for --config "
  330. hadoop_exit_with_usage 1
  331. else
  332. hadoop_error "ERROR: Cannot find configuration directory \"${confdir}\""
  333. hadoop_exit_with_usage 1
  334. fi
  335. ;;
  336. --daemon)
  337. shift
  338. HADOOP_DAEMON_MODE=$1
  339. shift
  340. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+2))
  341. if [[ -z "${HADOOP_DAEMON_MODE}" || \
  342. ! "${HADOOP_DAEMON_MODE}" =~ ^st(art|op|atus)$ ]]; then
  343. hadoop_error "ERROR: --daemon must be followed by either \"start\", \"stop\", or \"status\"."
  344. hadoop_exit_with_usage 1
  345. fi
  346. ;;
  347. --debug)
  348. shift
  349. HADOOP_SHELL_SCRIPT_DEBUG=true
  350. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+1))
  351. ;;
  352. --help|-help|-h|help|--h|--\?|-\?|\?)
  353. hadoop_exit_with_usage 0
  354. ;;
  355. --hostnames)
  356. shift
  357. HADOOP_WORKER_NAMES="$1"
  358. shift
  359. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+2))
  360. ;;
  361. --hosts)
  362. shift
  363. hadoop_populate_workers_file "$1"
  364. shift
  365. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+2))
  366. ;;
  367. --loglevel)
  368. shift
  369. # shellcheck disable=SC2034
  370. HADOOP_LOGLEVEL="$1"
  371. shift
  372. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+2))
  373. ;;
  374. --reexec)
  375. shift
  376. if [[ "${HADOOP_REEXECED_CMD}" = true ]]; then
  377. hadoop_error "ERROR: re-exec fork bomb prevention: --reexec already called"
  378. exit 1
  379. fi
  380. HADOOP_REEXECED_CMD=true
  381. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+1))
  382. ;;
  383. --workers)
  384. shift
  385. # shellcheck disable=SC2034
  386. HADOOP_WORKER_MODE=true
  387. ((HADOOP_PARSE_COUNTER=HADOOP_PARSE_COUNTER+1))
  388. ;;
  389. *)
  390. break
  391. ;;
  392. esac
  393. done
  394. hadoop_debug "hadoop_parse: asking caller to skip ${HADOOP_PARSE_COUNTER}"
  395. }
  396. ## @description 将自定义(程序)_(命令)_OPTS添加到HADOOP_OPTS。
  397. ## @description 还处理3.x之前版本中不推荐使用的案例。
  398. ## @audience public
  399. ## @stability evolving
  400. ## @replaceable yes
  401. ## @param program
  402. ## @param subcommand
  403. ## @return will exit on failure conditions
  404. function hadoop_subcommand_opts
  405. {
  406. declare program=$1
  407. declare command=$2
  408. declare uvar
  409. declare depvar
  410. declare uprogram
  411. declare ucommand
  412. if [[ -z "${program}" || -z "${command}" ]]; then
  413. return 1
  414. fi
  415. # bash 4 and up have built-in ways to upper and lower
  416. # case the contents of vars. This is faster than
  417. # calling tr.
  418. ## We don't call hadoop_build_custom_subcmd_var here
  419. ## since we need to construct this for the deprecation
  420. ## cases. For Hadoop 4.x, this needs to get cleaned up.
  421. if [[ -z "${BASH_VERSINFO[0]}" ]] \
  422. || [[ "${BASH_VERSINFO[0]}" -lt 4 ]]; then
  423. uprogram=$(echo "${program}" | tr '[:lower:]' '[:upper:]')
  424. ucommand=$(echo "${command}" | tr '[:lower:]' '[:upper:]')
  425. else
  426. uprogram=${program^^}
  427. ucommand=${command^^}
  428. fi
  429. uvar="${uprogram}_${ucommand}_OPTS"
  430. # Let's handle all of the deprecation cases early
  431. # HADOOP_NAMENODE_OPTS -> HDFS_NAMENODE_OPTS
  432. depvar="HADOOP_${ucommand}_OPTS"
  433. if [[ "${depvar}" != "${uvar}" ]]; then
  434. if [[ -n "${!depvar}" ]]; then
  435. hadoop_deprecate_envvar "${depvar}" "${uvar}"
  436. fi
  437. fi
  438. if [[ -n ${!uvar} ]]; then
  439. hadoop_debug "Appending ${uvar} onto HADOOP_OPTS"
  440. HADOOP_OPTS="${HADOOP_OPTS} ${!uvar}"
  441. return 0
  442. fi
  443. }
  444. ## @description 处理主程序项中的子命令
  445. ## @audience private
  446. ## @stability evolving
  447. ## @replaceable yes
  448. function hadoop_generic_java_subcmd_handler
  449. {
  450. declare priv_outfile
  451. declare priv_errfile
  452. declare priv_pidfile
  453. declare daemon_outfile
  454. declare daemon_pidfile
  455. declare secureuser
  456. # 确定守护程序是否将在安全模式下运行的默认/预期方式由hadoop_detect_priv_subcmd定义。
  457. # 如果返回true,则设置安全用户var并告诉世界我们处于安全模式
  458. if hadoop_detect_priv_subcmd "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"; then
  459. HADOOP_SUBCMD_SECURESERVICE=true
  460. secureuser=$(hadoop_build_custom_subcmd_var "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}" SECURE_USER)
  461. if ! hadoop_verify_user_resolves "${secureuser}"; then
  462. hadoop_error "ERROR: User defined in ${secureuser} (${!secureuser}) does not exist. Aborting."
  463. exit 1
  464. fi
  465. HADOOP_SECURE_USER="${!secureuser}"
  466. fi
  467. # 检查我们是否在安全模式下运行。
  468. # 从上面的内容来看,第三方可以做一些不同的事情——安全服务需要一些额外的设置——
  469. # 如果是,那么我们需要定义所有的priv和daemon内容
  470. # 如果不是,那么我们只需要定义daemon的内容。
  471. # 请注意,两者之间的守护进程变量有目的地不同
  472. if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then
  473. hadoop_subcommand_secure_opts "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"
  474. hadoop_verify_secure_prereq
  475. hadoop_setup_secure_service
  476. priv_outfile="${HADOOP_LOG_DIR}/privileged-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}-${HOSTNAME}.out"
  477. priv_errfile="${HADOOP_LOG_DIR}/privileged-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}-${HOSTNAME}.err"
  478. priv_pidfile="${HADOOP_PID_DIR}/privileged-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}.pid"
  479. daemon_outfile="${HADOOP_LOG_DIR}/hadoop-${HADOOP_SECURE_USER}-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}-${HOSTNAME}.out"
  480. daemon_pidfile="${HADOOP_PID_DIR}/hadoop-${HADOOP_SECURE_USER}-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}.pid"
  481. else
  482. daemon_outfile="${HADOOP_LOG_DIR}/hadoop-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}-${HOSTNAME}.out"
  483. daemon_pidfile="${HADOOP_PID_DIR}/hadoop-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}.pid"
  484. fi
  485. # 我们真的处于守护进程模式吗?
  486. # 如果是,请使用守护程序记录器和相应的日志文件。
  487. if [[ "${HADOOP_DAEMON_MODE}" != "default" ]]; then
  488. HADOOP_ROOT_LOGGER="${HADOOP_DAEMON_ROOT_LOGGER}"
  489. if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then
  490. HADOOP_LOGFILE="hadoop-${HADOOP_SECURE_USER}-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}-${HOSTNAME}.log"
  491. else
  492. HADOOP_LOGFILE="hadoop-${HADOOP_IDENT_STRING}-${HADOOP_SUBCMD}-${HOSTNAME}.log"
  493. fi
  494. fi
  495. # 完成环境定义:系统属性、env-var、类路径等。
  496. hadoop_finalize
  497. # 完成启动守护进程的艰巨工作,或者只是执行我们的交互式java类
  498. if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then
  499. if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then
  500. hadoop_secure_daemon_handler \
  501. "${HADOOP_DAEMON_MODE}" \
  502. "${HADOOP_SUBCMD}" \
  503. "${HADOOP_SECURE_CLASSNAME}" \
  504. "${daemon_pidfile}" \
  505. "${daemon_outfile}" \
  506. "${priv_pidfile}" \
  507. "${priv_outfile}" \
  508. "${priv_errfile}" \
  509. "${HADOOP_SUBCMD_ARGS[@]}"
  510. else
  511. hadoop_daemon_handler \
  512. "${HADOOP_DAEMON_MODE}" \
  513. "${HADOOP_SUBCMD}" \
  514. "${HADOOP_CLASSNAME}" \
  515. "${daemon_pidfile}" \
  516. "${daemon_outfile}" \
  517. "${HADOOP_SUBCMD_ARGS[@]}"
  518. fi
  519. exit $?
  520. else
  521. hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"
  522. fi
  523. }

总结下来就是:切换到yarn用户执行

"${HADOOP_YARN_HOME}/bin/yarn" \

--config "${HADOOP_CONF_DIR}" \

--daemon start \

resourcemanager

下面我们看下yarn命令中的实现

3、yarn命令

  1. #!/usr/bin/env bash
  2. #正在执行的脚本的名称
  3. HADOOP_SHELL_EXECNAME="yarn"
  4. #MYNAME会得到当前脚本名称,即:yarn
  5. MYNAME="${BASH_SOURCE-$0}"
  6. ## @description 构建yarn命令的用法文本
  7. ## @audience public
  8. ## @stability stable
  9. ## @replaceable no
  10. function hadoop_usage
  11. {
  12. hadoop_add_option "--buildpaths" "attempt to add class files from build tree"
  13. hadoop_add_option "--daemon (start|status|stop)" "operate on a daemon"
  14. hadoop_add_option "--hostnames list[,of,host,names]" "hosts to use in worker mode"
  15. hadoop_add_option "--loglevel level" "set the log4j level for this command"
  16. hadoop_add_option "--hosts filename" "list of hosts to use in worker mode"
  17. hadoop_add_option "--workers" "turn on worker mode"
  18. hadoop_add_subcommand "app|application" client "prints application(s) report/kill application/manage long running application"
  19. hadoop_add_subcommand "applicationattempt" client "prints applicationattempt(s) report"
  20. hadoop_add_subcommand "classpath" client "prints the class path needed to get the hadoop jar and the required libraries"
  21. hadoop_add_subcommand "cluster" client "prints cluster information"
  22. hadoop_add_subcommand "container" client "prints container(s) report"
  23. hadoop_add_subcommand "daemonlog" admin "get/set the log level for each daemon"
  24. hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables"
  25. hadoop_add_subcommand "jar <jar>" client "run a jar file"
  26. hadoop_add_subcommand "logs" client "dump container logs"
  27. hadoop_add_subcommand "node" admin "prints node report(s)"
  28. hadoop_add_subcommand "nodemanager" daemon "run a nodemanager on each worker"
  29. hadoop_add_subcommand "proxyserver" daemon "run the web app proxy server"
  30. hadoop_add_subcommand "queue" client "prints queue information"
  31. hadoop_add_subcommand "registrydns" daemon "run the registry DNS server"
  32. hadoop_add_subcommand "resourcemanager" daemon "run the ResourceManager"
  33. hadoop_add_subcommand "rmadmin" admin "admin tools"
  34. hadoop_add_subcommand "router" daemon "run the Router daemon"
  35. hadoop_add_subcommand "schedulerconf" client "Updates scheduler configuration"
  36. hadoop_add_subcommand "scmadmin" admin "SharedCacheManager admin tools"
  37. hadoop_add_subcommand "sharedcachemanager" daemon "run the SharedCacheManager daemon"
  38. hadoop_add_subcommand "timelinereader" client "run the timeline reader server"
  39. hadoop_add_subcommand "timelineserver" daemon "run the timeline server"
  40. hadoop_add_subcommand "top" client "view cluster information"
  41. hadoop_add_subcommand "nodeattributes" client "node attributes cli client"
  42. hadoop_add_subcommand "version" client "print the version"
  43. hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
  44. }
  45. ## @description yarn命令的默认命令处理程序
  46. ## @audience public
  47. ## @stability stable
  48. ## @replaceable no
  49. ## @param CLI arguments
  50. function yarncmd_case
  51. {
  52. subcmd=$1
  53. shift
  54. case ${subcmd} in
  55. app|application|applicationattempt|container)
  56. HADOOP_CLASSNAME=org.apache.hadoop.yarn.client.cli.ApplicationCLI
  57. set -- "${subcmd}" "$@"
  58. HADOOP_SUBCMD_ARGS=("$@")
  59. local sld="${HADOOP_YARN_HOME}/${YARN_DIR},\
  60. ${HADOOP_YARN_HOME}/${YARN_LIB_JARS_DIR},\
  61. ${HADOOP_HDFS_HOME}/${HDFS_DIR},\
  62. ${HADOOP_HDFS_HOME}/${HDFS_LIB_JARS_DIR},\
  63. ${HADOOP_COMMON_HOME}/${HADOOP_COMMON_DIR},\
  64. ${HADOOP_COMMON_HOME}/${HADOOP_COMMON_LIB_JARS_DIR}"
  65. hadoop_translate_cygwin_path sld
  66. hadoop_add_param HADOOP_OPTS service.libdir "-Dservice.libdir=${sld}"
  67. ;;
  68. #......省略......
  69. nodemanager)
  70. HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
  71. hadoop_add_classpath "$HADOOP_YARN_HOME/$YARN_DIR/timelineservice/*"
  72. hadoop_add_classpath "$HADOOP_YARN_HOME/$YARN_DIR/timelineservice/lib/*"
  73. #NodeManager启动类
  74. HADOOP_CLASSNAME='org.apache.hadoop.yarn.server.nodemanager.NodeManager'
  75. # Backwards compatibility
  76. if [[ -n "${YARN_NODEMANAGER_HEAPSIZE}" ]]; then
  77. HADOOP_HEAPSIZE_MAX="${YARN_NODEMANAGER_HEAPSIZE}"
  78. fi
  79. ;;
  80. proxyserver)
  81. HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
  82. #WebAppProxyServer启动类
  83. HADOOP_CLASSNAME='org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer'
  84. # Backwards compatibility
  85. if [[ -n "${YARN_PROXYSERVER_HEAPSIZE}" ]]; then
  86. HADOOP_HEAPSIZE_MAX="${YARN_PROXYSERVER_HEAPSIZE}"
  87. fi
  88. ;;
  89. resourcemanager)
  90. HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
  91. hadoop_add_classpath "$HADOOP_YARN_HOME/$YARN_DIR/timelineservice/*"
  92. hadoop_add_classpath "$HADOOP_YARN_HOME/$YARN_DIR/timelineservice/lib/*"
  93. #ResourceManager启动类
  94. HADOOP_CLASSNAME='org.apache.hadoop.yarn.server.resourcemanager.ResourceManager'
  95. # Backwards compatibility
  96. if [[ -n "${YARN_RESOURCEMANAGER_HEAPSIZE}" ]]; then
  97. HADOOP_HEAPSIZE_MAX="${YARN_RESOURCEMANAGER_HEAPSIZE}"
  98. fi
  99. local sld="${HADOOP_YARN_HOME}/${YARN_DIR},\
  100. ${HADOOP_YARN_HOME}/${YARN_LIB_JARS_DIR},\
  101. ${HADOOP_HDFS_HOME}/${HDFS_DIR},\
  102. ${HADOOP_HDFS_HOME}/${HDFS_LIB_JARS_DIR},\
  103. ${HADOOP_COMMON_HOME}/${HADOOP_COMMON_DIR},\
  104. ${HADOOP_COMMON_HOME}/${HADOOP_COMMON_LIB_JARS_DIR}"
  105. #详细请看第2步中 hadoop_translate_cygwin_path 的处理逻辑
  106. hadoop_translate_cygwin_path sld
  107. hadoop_add_param HADOOP_OPTS service.libdir "-Dservice.libdir=${sld}"
  108. ;;
  109. #......省略......
  110. *)
  111. HADOOP_CLASSNAME="${subcmd}"
  112. if ! hadoop_validate_classname "${HADOOP_CLASSNAME}"; then
  113. hadoop_exit_with_usage 1
  114. fi
  115. ;;
  116. esac
  117. }
  118. # 找到libexec...
  119. if [[ -n "${HADOOP_HOME}" ]]; then
  120. HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
  121. else
  122. bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
  123. HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
  124. fi
  125. HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
  126. HADOOP_NEW_CONFIG=true
  127. if [[ -f "${HADOOP_LIBEXEC_DIR}/yarn-config.sh" ]]; then
  128. # shellcheck source=./hadoop-yarn-project/hadoop-yarn/bin/yarn-config.sh
  129. . "${HADOOP_LIBEXEC_DIR}/yarn-config.sh"
  130. else
  131. echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/yarn-config.sh." 2>&1
  132. exit 1
  133. fi
  134. # hadoop_abs 也是 第2步 hadoop-functions.sh 中的函数 ,感兴趣的可以看第2
  135. # hadoop_abs 的作用是:给定一个文件名或目录,返回它的绝对版本
  136. MYNAME=$(hadoop_abs "${MYNAME}")
  137. # 如果未指定参数,则显示用法
  138. # $#表示执行脚本传入参数的个数
  139. if [[ $# = 0 ]]; then
  140. hadoop_exit_with_usage 1
  141. fi
  142. # 获取参数
  143. # shift 命令左移 shift执行后可以使 后面的参数向前移动,比如 shift 可以使 $2 成为 $1 , shift 3 可以使 $4 成为 $1
  144. # 此时 HADOOP_SUBCMD=resourcemanager
  145. HADOOP_SUBCMD=$1
  146. shift
  147. #检测用户是否可以执行给定的命令
  148. #如果此节点是resourcemanager角色,那么就启动并退出脚本
  149. #hadoop_uservar_su yarn resourcemanager yarn --reexec
  150. if hadoop_need_reexec yarn "${HADOOP_SUBCMD}"; then
  151. hadoop_uservar_su yarn "${HADOOP_SUBCMD}" \
  152. "${MYNAME}" \
  153. "--reexec" \
  154. "${HADOOP_USER_PARAMS[@]}"
  155. exit $?
  156. fi
  157. #验证是否允许 yarn 执行给定的子命令。
  158. hadoop_verify_user_perm "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"
  159. #得到子命令的所有参数,就像数组一样,可以得到某一个参数或全部参数
  160. HADOOP_SUBCMD_ARGS=("$@")
  161. #判断 yarn_subcommand_"${HADOOP_SUBCMD}" 函数是否存在
  162. #如果是直接执行
  163. #如果不是运行上面的 yarncmd_case 匹配到子命令进行执行
  164. if declare -f yarn_subcommand_"${HADOOP_SUBCMD}" >/dev/null 2>&1; then
  165. hadoop_debug "Calling dynamically: yarn_subcommand_${HADOOP_SUBCMD} ${HADOOP_SUBCMD_ARGS[*]}"
  166. "yarn_subcommand_${HADOOP_SUBCMD}" "${HADOOP_SUBCMD_ARGS[@]}"
  167. else
  168. yarncmd_case "${HADOOP_SUBCMD}" "${HADOOP_SUBCMD_ARGS[@]}"
  169. fi
  170. #目前还不清楚YARN_CLIENT_OPTS是否真的是一个有用的东西,
  171. #可以与HADOOP_CLIENT _OPTS分离。有人可能会使用它,
  172. #所以在我们(潜在地)将其添加到命令行之前,
  173. #我们不要弃用它,只需重写HADOOP_CLIENT_OPTS
  174. if [[ -n "${YARN_CLIENT_OPTS}" ]]; then
  175. HADOOP_CLIENT_OPTS=${YARN_CLIENT_OPTS}
  176. fi
  177. #如果HADOOP_SUBCMD_SUPPORTDAEMONIZATIONfalse,则将HADOOP_CLIENT_OPTS变量添加到HADOOP_OPTS
  178. hadoop_add_client_opts
  179. #命令中包含了 --workers 参数,就代表着需要链接到其他节点机器上执行命令
  180. #hadoop_common_worker_mode_execute 函数最终会通过 ssh 到远程节点执行命令 ,详见第2
  181. if [[ ${HADOOP_WORKER_MODE} = true ]]; then
  182. hadoop_common_worker_mode_execute "${HADOOP_YARN_HOME}/bin/yarn" "${HADOOP_USER_PARAMS[@]}"
  183. exit $?
  184. fi
  185. #将自定义(程序)_(命令)_OPTS添加到HADOOP_OPTS
  186. hadoop_subcommand_opts "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"
  187. # 此时所有内容都在全局中,因此调用泛型处理程序
  188. # 这里会完成启动守护进程的艰巨工作,或者只是执行我们的交互式java类
  189. hadoop_generic_java_subcmd_handler

下面会执行对应的java类,启动守护进程,我们看resourcemanager、nodemanager、proxyserver对应的java类,即:

org.apache.hadoop.yarn.server.resourcemanager.ResourceManager

org.apache.hadoop.yarn.server.nodemanager.NodeManager

org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer

三、java部分

1、ResourceManager

1.1、main

  1. public static void main(String argv[]) {
  2. Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  3. StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
  4. try {
  5. Configuration conf = new YarnConfiguration();
  6. GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
  7. argv = hParser.getRemainingArgs();
  8. // 如果格式化状态存储,则删除RMStateStore;否则正常启动
  9. if (argv.length >= 1) {
  10. if (argv[0].equals("-format-state-store")) {
  11. deleteRMStateStore(conf);
  12. } else if (argv[0].equals("-format-conf-store")) {
  13. deleteRMConfStore(conf);
  14. } else if (argv[0].equals("-remove-application-from-state-store")
  15. && argv.length == 2) {
  16. removeApplication(conf, argv[1]);
  17. } else {
  18. printUsage(System.err);
  19. }
  20. } else {
  21. ResourceManager resourceManager = new ResourceManager();
  22. //添加一个具有优先级的shutdownBook,优先级越高,运行得越早。
  23. //具有相同优先级的ShutdownHook以不确定的顺序运行。
  24. //
  25. //ShutdownHookManager允许以确定的顺序运行shutdownHook,优先考虑更高的优先级。
  26. //JVM以不确定的顺序或并行方式运行ShutdownHook。这个类注册一个JVM shutdownBook,
  27. //并根据优先级按顺序运行注册到它(这个类)的所有shutdownook。
  28. //除非一个钩子注册了通过addShutdownHook(Runnable,int,long,TimeUnit)
  29. //显式设置的关闭,否则分配给它的关闭时间由配置选项设置
  30. //
  31. //CompositeServiceShutdownHook 是 CompositeService的JVM Shutdown挂钩,
  32. //它将在JVM关闭的情况下优雅地停止给定的CompositeService。
  33. ShutdownHookManager.get().addShutdownHook(
  34. new CompositeServiceShutdownHook(resourceManager),
  35. SHUTDOWN_HOOK_PRIORITY);
  36. //初始化,详细看该类中的serviceInit(),也就是第1.2步
  37. resourceManager.init(conf);
  38. //启动resourceManager ,详细看该类的serviceStart(),也就是第1.3步
  39. resourceManager.start();
  40. }
  41. } catch (Throwable t) {
  42. LOG.error(FATAL, "Error starting ResourceManager", t);
  43. System.exit(-1);
  44. }
  45. }

1.2、serviceInit

  1. protected void serviceInit(Configuration conf) throws Exception {
  2. this.conf = conf;
  3. UserGroupInformation.setConfiguration(conf);
  4. //RMContextImpl类包含两个服务上下文。
  5. // 1、serviceContext:这些服务被称为Always-On服务。无论RM的HA状态如何,都需要始终运行的服务
  6. // 2、activeServiceCotext:活动服务上下文。只需要在活动RM上运行的服务。
  7. //注意:如果有任何新服务要添加到上下文中,请按照上面的描述将其添加到正确的上下文中。
  8. this.rmContext = new RMContextImpl();
  9. rmContext.setResourceManager(this);
  10. //实现ConfigurationProvider的基类。真正的ConfigurationProvider实现需要从中派生,并实现加载方法来实际加载配置。
  11. this.configurationProvider =
  12. ConfigurationProviderFactory.getConfigurationProvider(conf);
  13. this.configurationProvider.init(this.conf);
  14. rmContext.setConfigurationProvider(configurationProvider);
  15. //加载 core-site.xml
  16. loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
  17. //使用加载的core-site.xml进行refreshSuperUserGroupsConfiguration,
  18. //或者使用RM特定的配置先覆盖常见配置(如果存在)
  19. //查找名称以YarnConfiguration开头的所有配置。RM_PROXY_USER_PREFIX,
  20. //并通过将前缀替换为ProxyUsers为每个前缀添加一条记录。CONF_HADOOP_PROXYUSER
  21. RMServerUtils.processRMProxyUsersConf(conf);
  22. //使用属性的默认代理用户前缀刷新配置。
  23. ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);
  24. // 加载 yarn-site.xml
  25. loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
  26. //配置的健全性检查
  27. validateConfigs(this.conf);
  28. //登录前应设置HA配置
  29. //如果HA开启,验证Resource Manager HA的配置
  30. this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
  31. if (this.rmContext.isHAEnabled()) {
  32. HAUtil.verifyAndSetConfiguration(this.conf);
  33. }
  34. // 设置UGI并进行登录
  35. // 如果启用了安全性,请使用登录用户
  36. // 如果未启用安全性,则使用当前用户
  37. this.rmLoginUGI = UserGroupInformation.getCurrentUser();
  38. try {
  39. //详细见下方
  40. doSecureLogin();
  41. } catch(IOException ie) {
  42. throw new YarnRuntimeException("Failed to login", ie);
  43. }
  44. //使用setupDispatcher()注册所有AlwaysOn服务的处理程序
  45. //注册alwaysOn服务的处理程序
  46. rmDispatcher = setupDispatcher();
  47. addIfService(rmDispatcher);
  48. rmContext.setDispatcher(rmDispatcher);
  49. // 以下服务的顺序不应更改,因为服务将以相同的顺序启动
  50. // 由于选民服务需要初始化和启动管理服务,我们首先添加管理服务,然后添加选民服务
  51. adminService = createAdminService();
  52. addService(adminService);
  53. rmContext.setRMAdminService(adminService);
  54. //必须在管理员服务后添加选举人
  55. if (this.rmContext.isHAEnabled()) {
  56. //如果RM配置为使用嵌入的领导人选举人,则初始化领导人选举人。
  57. //判断两个配置是否开启
  58. // 1、yarn.resourcemanager.ha.automatic-failover.enabled 默认true (启用自动故障切换。默认情况下,仅当启用HA时才启用)
  59. // 2、yarn.resourcemanager.ha.automatic-failover.embedded 默认true (启用嵌入式自动故障切换。默认情况下,只有在启用HA时才会启用它。嵌入式elector依赖于RM状态存储来处理围栏,主要用于与ZKRMStateStore结合使用。)
  60. if (HAUtil.isAutomaticFailoverEnabled(conf)
  61. && HAUtil.isAutomaticFailoverEmbedded(conf)) {
  62. EmbeddedElector elector = createEmbeddedElector();
  63. addIfService(elector);
  64. rmContext.setLeaderElectorService(elector);
  65. }
  66. }
  67. rmContext.setYarnConfiguration(conf);
  68. //创建 RMActiveServices 的实例并对其进行初始化。
  69. //RMActiveServices处理RM中的所有活动服务。
  70. //创建StandByTransitionRunnable (这是一个线程)
  71. //它是将RM转换为备用状态的类。同一个StandByTransitionRunnable对象可以在多个线程中使用,
  72. //但只能运行一次。这是因为RM在转换到备用状态后可以返回到活动状态,
  73. //而在旧上下文中运行的相同RM无法再次转换到待机状态。
  74. //每次RM转换到活动状态时,都会创建一个新的可运行程序。
  75. //创建RMSecretManagerService,用于密钥管理
  76. //创建ContainerAllocationExpirer 用于容器的申请和到期管理
  77. //创建AMLivelinessMonitor AM的活动监控器
  78. //创建RMAppLifetimeMonitor 此服务将根据给定的生存期值监视应用程序。如果应用程序运行超过给定时间,则应用程序将被终止。
  79. //创建RMNodeLabelsManager 节点标签管理
  80. //创建NodeAttributesManager 此类捕获属性与RM的所有交互。
  81. //创建AllocationTagsManager 应用程序/容器标签和节点/机架之间的内存映射。受约束的亲和性/反亲和性和基数放置所需。
  82. //创建PlacementConstraintManagerService 用于存储和检索放置约束的接口
  83. //在此处添加资源配置文件,因为它已被AbstractYarnScheduler使用
  84. //创建ResourceProfilesManager 资源配置文件管理器的接口。提供一个接口以获取可用配置文件和一些辅助函数的列表。
  85. //创建MultiNodeSortingManager 节点排序管理器,用于运行所有排序线程和策略。扩展SchedulerNode
  86. //创建RMDelegatedNodeLabelsUpdater 定期更新ResourceManager的节点标签映射。它从RMNodeLabelsMappingProvider收集节点标签,并通过RMNodeLabels Manager更新节点标签映射。当配置“yars.node labels.configuration type”设置为“委派集中式”时,将启用此服务。
  87. //如果设置了yarn.resourcemanager.recovery.enabled 默认false (使RM在启动后恢复状态。如果为true,则必须指定yarn.resourcemanager.store.class。)则 创建 RMStateStore
  88. //RMStateStore 实现ResourceManager状态存储的基类。负责异步通知和与YARN对象的接口。真正的存储实现需要从中派生,并实现阻塞存储和加载方法来实际存储和加载状态。
  89. //确定UserGroupInformation是使用Kerberos来确定用户身份,还是依赖于简单身份验证
  90. //如果是Kerberos,则要创建DelegationTokenRenewer(续订应用程序委派令牌的服务)
  91. //为NodesListManager注册事件处理程序
  92. //NodesListManager:负责读取主机/排除文件以限制对RM的访问等
  93. //初始化调度程序
  94. //创建ResourceScheduler YarnScheduler的子类(使用此接口与调度程序进行通信,以便分配资源、清理资源。)
  95. //可以通过 yarn.resourcemanager.scheduler.class 设置 默认是 容量调度,即:org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
  96. //注册RmAppEvents的事件处理程序
  97. //创建ApplicationEventDispatcher
  98. //注册RmAppAttemptEvents的事件处理程序
  99. //创建ApplicationAttemptEventDispatcher
  100. //为RmNodes注册事件处理程序
  101. //创建NodeEventDispatcher
  102. //创建NMLivelinessMonitor 即 NodeManager生命周期监听
  103. //nm.liveness-monitor.expiry-interval-ms 默认 600000 ms 600s
  104. //没 nm.liveness-monitor.expiry-interval-ms / 3 时间 监听一次 即默认 200s
  105. //创建ResourceTrackerService 资源跟踪
  106. //初始化指标系统、JVM和日志记录相关指标
  107. //创建JvmPauseMonitor
  108. //该类设置一个简单的线程,该线程在睡眠短时间间隔的循环中运行。
  109. //如果睡眠时间明显长于其目标时间,则表示JVM或主机已暂停处理,
  110. //这可能会导致其他问题。如果检测到这样的暂停,线程将记录一条消息。
  111. //初始化预订系统
  112. //通过yarn.resourcemanager.reservation-system.enable设置启用标识 默认false
  113. //如果启用了,创建ReservationSystem
  114. //可以通过yarn.resourcemanager.reservation-system.class设置启动类
  115. //默认是CapacityReservationSystem.class
  116. //创建ApplicationMasterService
  117. //创建ApplicationACLsManager
  118. //创建QueueACLsManager
  119. //创建RMAppManager (此类管理资源管理器的应用程序列表。)
  120. //注册RMAppManagerEvents的事件处理程序
  121. //创建ClientRMService (资源管理器的客户端接口。该模块处理从客户端到资源管理器的所有rpc接口。)
  122. //创建ApplicationMasterLauncher
  123. //创建RMNMInfo (JMX bean列出所有节点管理器的状态)
  124. //是否在ResourceManager上启用服务rest api
  125. //可以通过 yarn.webapp.api-service.enable 设置 默认false
  126. //如果启用,则创建SystemServiceManager 默认实例化类为org.apache.hadoop.yarn.service.client.SystemServiceManagerImpl
  127. //SystemServiceManager实现
  128. //扫描配置系统服务路径
  129. //服务路径结构如下:
  130. //SYSTEM_SERVICE_DIR_PATH
  131. //|---- sync
  132. //| |--- user1
  133. //| | |---- service1.yarnfile
  134. //| | |---- service2.yarnfile
  135. //| |--- user2
  136. //| | |---- service1.yarnfile
  137. //| | ....
  138. //| |
  139. //|---- async
  140. //| |--- user3
  141. //| | |---- service1.yarnfile
  142. //| | |---- service2.yarnfile
  143. //| |--- user4
  144. //| | |---- service1.yarnfile
  145. //| | ....
  146. //| |
  147. //sync: 这些服务在服务同步启动时启动。这是一个阻塞服务启动
  148. //async: 这些服务在单独的线程中启动,在服务启动后没有任何延迟。非阻塞服务启动
  149. createAndInitActiveServices(false);
  150. //获取用于绑定的URL(RM的实际绑定地址)
  151. //其中可以指定绑定主机名以覆盖webAppURLWithoutScheme中的主机名。将使用webAppURLWithoutScheme中指定的端口。
  152. //可以通过 yarn.resourcemanager.bind-host 设置
  153. webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
  154. YarnConfiguration.RM_BIND_HOST,
  155. WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
  156. //创建RMApplicationHistoryWriter
  157. //ResourceManager使用这个类来编写RMApp、RMAppAttempt和RMContainer的信息。
  158. // 这些API是非阻塞的,只需安排一个写入历史事件。
  159. // 一个自包含的调度器向量将在单独的线程中处理事件,并提取将要持久化的所需字段。
  160. // 然后,提取的信息将通过ApplicationHistoryStore的实现持久化
  161. RMApplicationHistoryWriter rmApplicationHistoryWriter =
  162. createRMApplicationHistoryWriter();
  163. addService(rmApplicationHistoryWriter);
  164. rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
  165. //首先初始化RM时间线收集器,以便系统度量发布者可以绑定到它
  166. if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
  167. RMTimelineCollectorManager timelineCollectorManager =
  168. createRMTimelineCollectorManager();
  169. addService(timelineCollectorManager);
  170. rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
  171. }
  172. //创建CombinedSystemMetricsPublisher
  173. //度量系统发布者
  174. SystemMetricsPublisher systemMetricsPublisher =
  175. createSystemMetricsPublisher();
  176. addIfService(systemMetricsPublisher);
  177. rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
  178. //注册ResourceManagerMXBean
  179. //MBeans.register("ResourceManager", "ResourceManager", this);
  180. //使用标准命名约定来注册MBean
  181. registerMXBean();
  182. super.serviceInit(this.conf);
  183. }
  184. protected void doSecureLogin() throws IOException {
  185. //从配置中检索RM绑定地址
  186. InetSocketAddress socAddr = getBindAddress(conf);
  187. //以配置中指定的主体身份登录。将用户的Kerberos主体名称中的$host替换为hostname。
  188. //如果是非安全模式-返回。如果没有可用的钥匙扣,则除一个例外
  189. SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
  190. YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName());
  191. // 如果启用了安全性 将 UGI of loginUser 设置给 rmLoginUGI
  192. if (UserGroupInformation.isSecurityEnabled()) {
  193. this.rmLoginUGI = UserGroupInformation.getLoginUser();
  194. }
  195. }

1.3、serviceStart

  1. protected void serviceStart() throws Exception {
  2. RMStateStore rmStore = rmContext.getStateStore();
  3. //无论恢复启用,状态存储都需要启动,因为应用程序需要事件才能移动到其他状态。
  4. //RMStateStore启动
  5. //实现ResourceManager状态存储的基类。
  6. //负责异步通知和与YARN对象的接口。真正的存储实现需要从中派生,并实现阻塞存储和加载方法来实际存储和加载状态。
  7. rmStore.start();
  8. //第1.2步 serviceInit() 时对其进行过初始化 ,默认false
  9. if(recoveryEnabled) {
  10. try {
  11. LOG.info("Recovery started");
  12. rmStore.checkVersion();
  13. if (rmContext.isWorkPreservingRecoveryEnabled()) {
  14. rmContext.setEpoch(rmStore.getAndIncrementEpoch());
  15. }
  16. RMState state = rmStore.loadState();
  17. recover(state);
  18. LOG.info("Recovery ended");
  19. } catch (Exception e) {
  20. // the Exception from loadState() needs to be handled for
  21. // HA and we need to give up master status if we got fenced
  22. LOG.error("Failed to load/recover state", e);
  23. throw e;
  24. }
  25. } else {
  26. //是否开启联合身份验证
  27. //可以通过 yarn.federation.enabled 设置 默认false
  28. if (HAUtil.isFederationEnabled(conf)) {
  29. long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
  30. YarnConfiguration.DEFAULT_RM_EPOCH);
  31. rmContext.setEpoch(epoch);
  32. LOG.info("Epoch set for Federation: " + epoch);
  33. }
  34. }
  35. super.serviceStart();
  36. }
  37. protected void serviceStart() throws Exception {
  38. //获取在serviceInit中添加的所有服务列表
  39. List<Service> services = getServices();
  40. if (LOG.isDebugEnabled()) {
  41. LOG.debug(getName() + ": starting services, size=" + services.size());
  42. }
  43. for (Service service : services) {
  44. //启动服务。如果失败,将停止服务并引发异常
  45. service.start();
  46. }
  47. super.serviceStart();
  48. }

2、NodeManager

注意:以下启动场景都是在单个节点中运行的

2.1、main

  1. public static void main(String[] args) throws IOException {
  2. Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  3. StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
  4. @SuppressWarnings("resource")
  5. NodeManager nodeManager = new NodeManager();
  6. Configuration conf = new YarnConfiguration();
  7. //创建一个GenericOptionsParser以仅解析通用Hadoop参数。
  8. //getRemainingArgs()可以获得除泛型参数之外的字符串参数数组
  9. new GenericOptionsParser(conf, args);
  10. //初始化并启动NodeManager,我们在第2.2步详细看下
  11. nodeManager.initAndStartNodeManager(conf, false);
  12. }

2.2、initAndStartNodeManager

  1. private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
  2. try {
  3. //如果我们是基于Unix的系统,但没有bash,则无法启动。
  4. //Bash是在基于Unix的系统下启动容器所必需的。
  5. //也就是说容器的启动是通过Bash来启动的
  6. if (!Shell.WINDOWS) {
  7. if (!Shell.checkIsBashSupported()) {
  8. String message =
  9. "Failing NodeManager start since we're on a "
  10. + "Unix-based system but bash doesn't seem to be available.";
  11. LOG.error(message);
  12. throw new YarnRuntimeException(message);
  13. }
  14. }
  15. //如果我们正在重新启动,请移除旧的挂钩
  16. if (hasToReboot && null != nodeManagerShutdownHook) {
  17. ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
  18. }
  19. //CompositeService的JVM Shutdown挂钩,它将在JVM关闭的情况下优雅地停止给定的CompositeService。
  20. nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
  21. ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
  22. SHUTDOWN_HOOK_PRIORITY);
  23. //只有当从main()函数实例化NodeManager时,才应调用系统出口
  24. this.shouldExitOnShutdownEvent = true;
  25. //初始化NodeManager,详细看第2.3步(本类的serviceInit())
  26. this.init(conf);
  27. //启动NodeManager,详细看第2.4步(本类的serviceStart())
  28. this.start();
  29. } catch (Throwable t) {
  30. LOG.error("Error starting NodeManager", t);
  31. System.exit(-1);
  32. }
  33. }

2.3、serviceInit

  1. protected void serviceInit(Configuration conf) throws Exception {
  2. UserGroupInformation.setConfiguration(conf);
  3. //是否启用RM保留工作的恢复
  4. //可以通过yarn.resourcemanager.work-preserving-recovery.enabled设置,默认true
  5. rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
  6. .RM_WORK_PRESERVING_RECOVERY_ENABLED,
  7. YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
  8. try {
  9. //初始化并启动RecoveryStore
  10. initAndStartRecoveryStore(conf);
  11. } catch (IOException e) {
  12. String recoveryDirName = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
  13. throw new
  14. YarnRuntimeException("Unable to initialize recovery directory at "
  15. + recoveryDirName, e);
  16. }
  17. //创建NMContainerTokenSecretManager
  18. //NodeManager容器密钥管理
  19. //NM只保留两个主钥匙。RM知道的当前密钥和上一个滚动间隔中的密钥。
  20. NMContainerTokenSecretManager containerTokenSecretManager =
  21. new NMContainerTokenSecretManager(conf, nmStore);
  22. //创建NMTokenSecretManagerInNM
  23. //里面有一个应用到尝试任务列表的映射和尝试应用和其密钥的映射
  24. NMTokenSecretManagerInNM nmTokenSecretManager =
  25. new NMTokenSecretManagerInNM(nmStore);
  26. recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
  27. this.aclsManager = new ApplicationACLsManager(conf);
  28. //创建LocalDirsHandlerService
  29. //提供检查节点本地目录运行状况功能的类。
  30. //这通过定期检查nodemanager本地目录和nodemanager日志目录的运行状况来专门管理它们。
  31. this.dirsHandler = new LocalDirsHandlerService(metrics);
  32. //是否启用分布式计划
  33. //可以通过yarn.nodemanager.distributed-scheduling.enabled 设置 默认 false
  34. boolean isDistSchedulingEnabled =
  35. conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
  36. YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
  37. //创建NM上下文
  38. this.context = createNMContext(containerTokenSecretManager,
  39. nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
  40. //创建ResourcePluginManager
  41. //管理在此NodeManager上配置的ResourcePlugin
  42. //ResourcePlugin:
  43. //ResourcePlugin是节点管理器的一个接口,可以更容易地支持新资源类型的发现/管理/隔离
  44. //主要有两部分:
  45. // createResourceHandler:
  46. // 当资源类型需要任何特殊隔离时,插件需要返回ResourceHandler。
  47. // 这将在NodeManager启动期间添加到ResourceHandlerChain中。
  48. // 当不需要特殊隔离时,返回null。
  49. // getNodeResourceHandlerInstance:
  50. // 当资源类型需要任何发现机制时,插件需要返回NodeResourceUpdaterPlugin
  51. // 例如,如果我们想在NM注册期间设置资源值或在NM-RM心跳期间发送更新,
  52. // 我们可以实现NodeResourceUpdaterPlugin
  53. // 并更新NodeHeartbeatRequest或RegisterNodeManagerRequest的字段
  54. // 这将在每次节点状态更新或节点注册时调用,请避免每次都创建新实例
  55. ResourcePluginManager pluginManager = createResourcePluginManager();
  56. //
  57. pluginManager.initialize(context);
  58. ((NMContext)context).setResourcePluginManager(pluginManager);
  59. //创建ContainerExecutor
  60. //这个类是用于在底层操作系统上启动容器的机制的抽象。所有执行器实现都必须扩展它
  61. //可以通过yarn.nodemanager.container-executor.class设置该执行器
  62. //默认是DefaultContainerExecutor.class
  63. //DefaultContainerExecuter类提供通用的容器执行服务。通过ProcessBuilder以独立于平台的方式处理流程执行
  64. //ProcessBuilder用于创建操作系统进程
  65. //原来容器的创建最终是调用java的ProcessBuilder在操作系统中创建一个进程来实现的
  66. ContainerExecutor exec = createContainerExecutor(conf);
  67. try {
  68. //运行执行器初始化步骤。验证必要的配置和权限是否到位。
  69. exec.init(context);
  70. } catch (IOException e) {
  71. throw new YarnRuntimeException("Failed to initialize container executor", e);
  72. }
  73. DeletionService del = createDeletionService(exec);
  74. addService(del);
  75. //NodeManager级调度器
  76. //AsyncDispatcher:
  77. //在单独的线程中调度事件。目前只有一个线程能做到这一点。每个事件类型类可能有多个通道,并且可以使用线程池来调度事件。
  78. this.dispatcher = createNMDispatcher();
  79. //创建NodeHealthCheckerService
  80. //提供检查节点运行状况并向要求运行状况检查器报告的服务报告的功能的类
  81. nodeHealthChecker =
  82. new NodeHealthCheckerService(
  83. getNodeHealthScriptRunner(conf), dirsHandler);
  84. addService(nodeHealthChecker);
  85. ((NMContext)context).setContainerExecutor(exec);
  86. ((NMContext)context).setDeletionService(del);
  87. //创建NodeStatusUpdaterImpl
  88. //节点状态更新,比如与ResourceManager的通信,对容器进行管理
  89. nodeStatusUpdater =
  90. createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
  91. //创建NodeLabelsProvider 负责获取节点标签
  92. nodeLabelsProvider = createNodeLabelsProvider(conf);
  93. if (nodeLabelsProvider != null) {
  94. addIfService(nodeLabelsProvider);
  95. nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
  96. }
  97. //创建NodeAttributesProvider 负责获取节点属性
  98. nodeAttributesProvider = createNodeAttributesProvider(conf);
  99. if (nodeAttributesProvider != null) {
  100. addIfService(nodeAttributesProvider);
  101. nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
  102. }
  103. //创建NodeResourceMonitorImpl
  104. //节点资源监视器的实现。它定期跟踪节点的资源利用情况,并将其报告给NM。
  105. nodeResourceMonitor = createNodeResourceMonitor();
  106. addService(nodeResourceMonitor);
  107. ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
  108. //创建ContainerManagerImpl 管理容器生命周期的实体
  109. containerManager =
  110. createContainerManager(context, exec, del, nodeStatusUpdater,
  111. this.aclsManager, dirsHandler);
  112. addService(containerManager);
  113. ((NMContext) context).setContainerManager(containerManager);
  114. //创建NMLogAggregationStatusTracker
  115. //用于缓存已完成应用程序的日志聚合状态。它还将定期删除旧的缓存日志聚合状态
  116. this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
  117. context);
  118. addService(nmLogAggregationStatusTracker);
  119. ((NMContext)context).setNMLogAggregationStatusTracker(
  120. this.nmLogAggregationStatusTracker);
  121. WebServer webServer = createWebServer(context, containerManager
  122. .getContainersMonitor(), this.aclsManager, dirsHandler);
  123. addService(webServer);
  124. ((NMContext) context).setWebServer(webServer);
  125. ((NMContext) context).setQueueableContainerAllocator(
  126. new OpportunisticContainerAllocator(
  127. context.getContainerTokenSecretManager()));
  128. dispatcher.register(ContainerManagerEventType.class, containerManager);
  129. dispatcher.register(NodeManagerEventType.class, this);
  130. addService(dispatcher);
  131. //创建JvmPauseMonitor
  132. //该类设置一个简单的线程,该线程在睡眠短时间间隔的循环中运行。
  133. //如果睡眠时间明显长于其目标时间,则表示JVM或主机已暂停处理,
  134. //这可能会导致其他问题。如果检测到这样的暂停,线程将记录一条消息。
  135. pauseMonitor = new JvmPauseMonitor();
  136. addService(pauseMonitor);
  137. metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
  138. //初始化度量系统
  139. DefaultMetricsSystem.initialize("NodeManager");
  140. //时间轴服务v.2是否通过配置启用
  141. if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
  142. this.nmCollectorService = createNMCollectorService(context);
  143. addService(nmCollectorService);
  144. }
  145. //StatusUpdater应该最后添加,这样它才能最后启动,这样我们就可以在向RM注册之前确保一切正常
  146. addService(nodeStatusUpdater);
  147. ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
  148. nmStore.setNodeStatusUpdater(nodeStatusUpdater);
  149. //在为添加的服务调用init之前进行安全登录。
  150. try {
  151. doSecureLogin();
  152. } catch (IOException e) {
  153. throw new YarnRuntimeException("Failed NodeManager login", e);
  154. }
  155. //注册NodeManagerMXBean
  156. //MBeans.register("NodeManager", "NodeManager", this);
  157. registerMXBean();
  158. super.serviceInit(conf);
  159. // TODO add local dirs to del
  160. }
  161. private void initAndStartRecoveryStore(Configuration conf)
  162. throws IOException {
  163. //是否启用节点管理器以在启动后恢复
  164. //可以通过yarn.nodemanager.recovery.enabled设置,默认false
  165. boolean recoveryEnabled = conf.getBoolean(
  166. YarnConfiguration.NM_RECOVERY_ENABLED,
  167. YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
  168. if (recoveryEnabled) {
  169. FileSystem recoveryFs = FileSystem.getLocal(conf);
  170. String recoveryDirName = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
  171. if (recoveryDirName == null) {
  172. throw new IllegalArgumentException("Recovery is enabled but " +
  173. YarnConfiguration.NM_RECOVERY_DIR + " is not set.");
  174. }
  175. Path recoveryRoot = new Path(recoveryDirName);
  176. recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700));
  177. nmStore = new NMLeveldbStateStoreService();
  178. } else {
  179. //未存储状态时要使用的状态存储
  180. nmStore = new NMNullStateStoreService();
  181. }
  182. nmStore.init(conf);
  183. nmStore.start();
  184. }

2.4、serviceStart

  1. protected void serviceStart() throws Exception {
  2. //获取serviceInit()时添加到服务列表,并依次启动它们
  3. List<Service> services = getServices();
  4. if (LOG.isDebugEnabled()) {
  5. LOG.debug(getName() + ": starting services, size=" + services.size());
  6. }
  7. for (Service service : services) {
  8. service.start();
  9. }
  10. super.serviceStart();
  11. }

3、WebAppProxyServer

ProxyServer将位于最终用户和AppMaster web界面之间。

3.1、main

  1. public static void main(String[] args) {
  2. Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  3. StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG);
  4. try {
  5. YarnConfiguration configuration = new YarnConfiguration();
  6. new GenericOptionsParser(configuration, args);
  7. //启动代理服务器
  8. WebAppProxyServer proxyServer = startServer(configuration);
  9. proxyServer.proxy.join();
  10. } catch (Throwable t) {
  11. ExitUtil.terminate(-1, t);
  12. }
  13. }
  14. protected static WebAppProxyServer startServer(Configuration configuration)
  15. throws Exception {
  16. WebAppProxyServer proxy = new WebAppProxyServer();
  17. ShutdownHookManager.get().addShutdownHook(
  18. new CompositeServiceShutdownHook(proxy), SHUTDOWN_HOOK_PRIORITY);
  19. //实际调用本类的serviceInit(),详看第3.2步
  20. proxy.init(configuration);
  21. //初始化时只添加了两个服务 WebAppProxy 和 JvmPauseMonitor
  22. //我们看下WebAppProxy 的启动,详见第3.3步
  23. proxy.start();
  24. return proxy;
  25. }

3.2、serviceInit

  1. //只添加了两个服务 WebAppProxy 和 JvmPauseMonitor
  2. protected void serviceInit(Configuration conf) throws Exception {
  3. Configuration config = new YarnConfiguration(conf);
  4. //以指定给代理的Kerberose主体身份登录
  5. doSecureLogin(conf);
  6. //构建WebAppProxy
  7. proxy = new WebAppProxy();
  8. addService(proxy);
  9. //这些在RM、NM中都讲过
  10. DefaultMetricsSystem.initialize("WebAppProxyServer");
  11. JvmMetrics jm = JvmMetrics.initSingleton("WebAppProxyServer", null);
  12. pauseMonitor = new JvmPauseMonitor();
  13. addService(pauseMonitor);
  14. jm.setPauseMonitor(pauseMonitor);
  15. super.serviceInit(config);
  16. }

3.3、WebAppProxy

  1. protected void serviceInit(Configuration conf) throws Exception {
  2. //core-default.xml中的 hadoop.security.authentication 值由kerberos和simple,默认simple
  3. String auth = conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
  4. if (auth == null || "simple".equals(auth)) {
  5. isSecurityEnabled = false;
  6. } else if ("kerberos".equals(auth)) {
  7. isSecurityEnabled = true;
  8. } else {
  9. LOG.warn("Unrecognized attribute value for " +
  10. CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION +
  11. " of " + auth);
  12. }
  13. //web代理的地址
  14. //web代理的地址网络代理的地址为HOST:PORT,如果未给定,则代理将作为RM的一部分运行
  15. //先看yarn.web-proxy.address是否设置,如果没有设置再看
  16. //yarn.resourcemanager.webapp.address(RM web应用程序的http地址。如果只提供一个主机作为值,则网络应用程序将在随机端口上提供服务。)默认值为${yarn.resourcemanager.hostname}:8088
  17. String proxy = WebAppUtils.getProxyHostAndPort(conf);
  18. String[] proxyParts = proxy.split(":");
  19. proxyHost = proxyParts[0];
  20. //创建到RM/Application History Server的新连接以获取应用程序报告。
  21. fetcher = new AppReportFetcher(conf);
  22. //获取yarn.web-proxy.address的值
  23. bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS);
  24. if(bindAddress == null || bindAddress.isEmpty()) {
  25. throw new YarnRuntimeException(YarnConfiguration.PROXY_ADDRESS +
  26. " is not set so the proxy will not run.");
  27. }
  28. LOG.info("Instantiating Proxy at " + bindAddress);
  29. String[] parts = StringUtils.split(bindAddress, ':');
  30. port = 0;
  31. if (parts.length == 2) {
  32. bindAddress = parts[0];
  33. port = Integer.parseInt(parts[1]);
  34. }
  35. //从相同ACL的字符串表示构建一个新ACL。
  36. //字符串是一个以逗号分隔的用户和组列表。用户列表位于第一位,并由组列表后面的空格分隔。例如“user1,user2 group1,group2”
  37. acl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
  38. YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
  39. super.serviceInit(conf);
  40. }
  41. @Override
  42. protected void serviceStart() throws Exception {
  43. try {
  44. Configuration conf = getConfig();
  45. HttpServer2.Builder b = new HttpServer2.Builder()
  46. .setName("proxy")
  47. .addEndpoint(
  48. URI.create(WebAppUtils.getHttpSchemePrefix(conf) + bindAddress
  49. + ":" + port)).setFindPort(port == 0).setConf(getConfig())
  50. .setACL(acl);
  51. if (YarnConfiguration.useHttps(conf)) {
  52. WebAppUtils.loadSslConfiguration(b);
  53. }
  54. proxyServer = b.build();
  55. proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
  56. ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
  57. proxyServer.setAttribute(FETCHER_ATTRIBUTE, fetcher);
  58. proxyServer
  59. .setAttribute(IS_SECURITY_ENABLED_ATTRIBUTE, isSecurityEnabled);
  60. proxyServer.setAttribute(PROXY_HOST_ATTRIBUTE, proxyHost);
  61. //调用HttpServer2.start()
  62. //最终调用的时org.eclipse.jetty.server.start()
  63. //启动服务器。不等待服务器启动
  64. proxyServer.start();
  65. } catch (IOException e) {
  66. LOG.error("Could not start proxy web server",e);
  67. throw e;
  68. }
  69. super.serviceStart();
  70. }
  71. @Override
  72. protected void serviceStop() throws Exception {
  73. if(proxyServer != null) {
  74. try {
  75. proxyServer.stop();
  76. } catch (Exception e) {
  77. LOG.error("Error stopping proxy web server", e);
  78. throw new YarnRuntimeException("Error stopping proxy web server",e);
  79. }
  80. }
  81. if(this.fetcher != null) {
  82. this.fetcher.stop();
  83. }
  84. super.serviceStop();
  85. }

四、总结

1、用户执行./start-yarn.sh

2、start-yarn.sh中依次启动resourceManager、nodemanager、proxyserver

3、根据yarn命令和hadoop-functions.sh找到三个角色的启动类,并且在本地或者远程(通过ssh的方式)启动各自的java进程

4、resourceManager、nodemanager、proxyserver初始化各自的服务列表

5、resourceManager、nodemanager、proxyserver按照服务列表依次启动服务

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/429302
推荐阅读
相关标签
  

闽ICP备14008679号