赞
踩
import os import subprocess import sys import xml.etree.ElementTree as ET from lxml import etree from shutil import copyfile, move import shutil import re version = '3.1.3' hadoop = f'hadoop-{version}' hive_version = '3.1.2' path = 'C:\\' directory_path = path + hadoop java_path = r'C:\Program Files\Java\jre-1.8' hive = f'apache-hive-{hive_version}-bin' hive_path = path + hive mysql_path = r'C:\Program Files\MySQL\MySQL Server 8.0' mysql_version = '8.0.36.0' mysql = f'mysql-installer-community-{mysql_version}.msi' def jre_8_install(): try: result = subprocess.run(['java', '-version'], stderr=subprocess.PIPE, universal_newlines=True) if not result.stderr.startswith('java version'): print(result.stderr) process = subprocess.run(['winget','install','Oracle.JavaRuntimeEnvironment'], shell=True, check=True) except: process = subprocess.run(['winget','install','Oracle.JavaRuntimeEnvironment'], shell=True, check=True) print('JRE 8 install successfully') java_home = os.environ.get('JAVA_HOME') if java_home != java_path: process = subprocess.run(['setx','JAVA_HOME',java_path], shell=True, check=True) print('JAVA_HOME environment variable is set') def set_hadoop_home(): hadoop_home = os.environ.get('HADOOP_HOME') if hadoop_home == directory_path: print('HADOOP_HOME environment variable is set') else: process = subprocess.run(['setx','HADOOP_HOME',directory_path], shell=True, check=True) print('HADOOP_HOME environment variable is set') try: result = subprocess.run(['hadoop', '--version'], stderr=subprocess.PIPE, universal_newlines=True) if result.stderr.startswith('java'): print('Path environment variable is set') else: print(result.stderr) except: path = os.environ.get('Path') if not rf'{directory_path}\bin' in path: hadoop_path = rf'{path}%HADOOP_HOME%\bin' process = subprocess.run(['setx','Path', hadoop_path], shell=True, check=True) if not rf'{directory_path}\sbin' in path: hadoop_path = rf'{path}%HADOOP_HOME%\sbin' print(f'Path=\n{hadoop_path}') process = subprocess.run(['setx','Path', hadoop_path], shell=True, check=True) print('Hadoop Path environment variable is set') def set_java_library_path(): java_library_path = os.environ.get('JAVA_LIBRARY_PATH') if java_library_path != rf'{directory_path}\lib\native': process = subprocess.run(['setx','JAVA_LIBRARY_PATH',rf'{directory_path}\lib\native'], shell=True, check=True) print('JAVA_LIBRARY_PATH environment variable is set') def set_hadoop_common_lib_native_dir(): java_library_path = os.environ.get('HADOOP_COMMON_LIB_NATIVE_DIR') if java_library_path != rf'{directory_path}\lib\native': process = subprocess.run(['setx','HADOOP_COMMON_LIB_NATIVE_DIR',r'%HADOOP_HOME%\lib\native'], shell=True, check=True) print('HADOOP_COMMON_LIB_NATIVE_DIR environment variable is set') def set_hive_home(): hive_home = os.environ.get('HIVE_HOME') if hive_home != hive_path: process = subprocess.run(['setx','HIVE_HOME',hive_path], shell=True, check=True) path = os.environ.get('Path') if not rf'{hive_path}\bin' in path: hive_bin_path = rf'{path.replace(";;",";")};%HIVE_HOME%\bin' print(hive_bin_path) process = subprocess.run(['setx','Path', hive_bin_path], shell=True, check=True) print('HIVE_HOME environment variable is set') def create_directories(base_dir): # 指定data目录并检查是否存在,如果不存在则创建 data_dir = os.path.join(base_dir, "data") if not os.path.exists(data_dir): os.makedirs(data_dir) # 指定并检查namenode和datanode目录是否存在,如果不存在则创建 namenode_dir = os.path.join(data_dir, "namenode") datanode_dir = os.path.join(data_dir, "datanode") for dir_path in [namenode_dir, datanode_dir]: if not os.path.exists(dir_path): os.makedirs(dir_path) # 指定tmp目录并检查是否存在,如果不存在则创建 tmp_dir = os.path.join(base_dir, "tmp") if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) print("Directories namenode and datanode are created") def update_core_site_xml(base_dir): # XML文件路径 file_path = rf'{base_dir}\etc\hadoop\core-site.xml' # 检查文件是否存在 if os.path.exists(file_path): # 解析XML文件 tree = ET.parse(file_path) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则创建一个新的configuration节点,并将原有内容作为其子节点 new_root = ET.Element('configuration') new_root.append(root) tree._setroot(new_root) root = new_root # 检查是否有property节点 property_node = root.find('property') if not property_node: # 创建新的property节点 property_node = ET.SubElement(root, 'property') if len(property_node) == 0: # 创建name和value节点 name_node = ET.SubElement(property_node, 'name') name_node.text = 'fs.defaultFS' value_node = ET.SubElement(property_node, 'value') value_node.text = 'hdfs://localhost:9000' # 将修改写回文件 tree.write(file_path, encoding='utf-8', xml_declaration=True) # 加载XML文件 parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(file_path, parser) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则可能需要更复杂的处理,这里假设它就是 raise ValueError("Expected root element to be 'configuration'") # 创建新的注释和样式表声明节点 xml_stylesheet = etree.ProcessingInstruction('xml-stylesheet', 'type="text/xsl" href="configuration.xsl"') comment = etree.Comment('\n Licensed under the Apache License, Version 2.0 (the "License");\n' ' you may not use this file except in compliance with the License.\n' ' You may obtain a copy of the License at\n' '\n' ' http://www.apache.org/licenses/LICENSE-2.0\n' '\n' ' Unless required by applicable law or agreed to in writing, software\n' ' distributed under the License is distributed on an "AS IS" BASIS,\n' ' WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n' ' See the License for the specific language governing permissions and\n' ' limitations under the License. See accompanying LICENSE file.\n') # 在configuration节点之前插入新的节点 root.addprevious(xml_stylesheet) root.addprevious(comment) # 为了保持格式化输出,我们使用lxml的pretty_print函数 # 首先,我们需要创建一个新的XML树,因为不能直接修改原文件 new_tree = etree.ElementTree(root) # 格式化并写入文件 with open(file_path, 'wb') as f: new_tree.write(f, pretty_print=True, xml_declaration=True, encoding='utf-8') print("core-site.xml updated") else: print(f"{file_path} does not exist") def update_hdfs_site_xml(base_dir): # XML文件路径 file_path = rf'{base_dir}\etc\hadoop\hdfs-site.xml' # 检查文件是否存在 if os.path.exists(file_path): # 解析XML文件 tree = ET.parse(file_path) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则创建一个新的configuration节点,并将原有内容作为其子节点 new_root = ET.Element('configuration') new_root.append(root) tree._setroot(new_root) root = new_root # 检查是否有property节点 property_node = root.find('property') if not property_node: # 创建新的property节点 property_node = ET.SubElement(root, 'property') if len(property_node) == 0: # 创建name和value节点 name_node = ET.SubElement(property_node, 'name') name_node.text = 'dfs.replication' value_node = ET.SubElement(property_node, 'value') value_node.text = '1' # 创建新的property节点 property_node2 = ET.SubElement(root, 'property') # 创建name和value节点 name_node = ET.SubElement(property_node2, 'name') name_node.text = 'dfs.namenode.name.dir' value_node = ET.SubElement(property_node2, 'value') value_node.text = f'/{path[:-1]}/{hadoop}/data/namenode' # 创建新的property节点 property_node3 = ET.SubElement(root, 'property') # 创建name和value节点 name_node = ET.SubElement(property_node3, 'name') name_node.text = 'dfs.datanode.data.dir' value_node = ET.SubElement(property_node3, 'value') value_node.text = f'/{path[:-1]}/{hadoop}/data/datanode' # 将修改写回文件 tree.write(file_path, encoding='utf-8', xml_declaration=True) # 加载XML文件 parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(file_path, parser) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则可能需要更复杂的处理,这里假设它就是 raise ValueError("Expected root element to be 'configuration'") # 创建新的注释和样式表声明节点 xml_stylesheet = etree.ProcessingInstruction('xml-stylesheet', 'type="text/xsl" href="configuration.xsl"') comment = etree.Comment('\n Licensed under the Apache License, Version 2.0 (the "License");\n' ' you may not use this file except in compliance with the License.\n' ' You may obtain a copy of the License at\n' '\n' ' http://www.apache.org/licenses/LICENSE-2.0\n' '\n' ' Unless required by applicable law or agreed to in writing, software\n' ' distributed under the License is distributed on an "AS IS" BASIS,\n' ' WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n' ' See the License for the specific language governing permissions and\n' ' limitations under the License. See accompanying LICENSE file.\n') # 在configuration节点之前插入新的节点 root.addprevious(xml_stylesheet) root.addprevious(comment) # 为了保持格式化输出,我们使用lxml的pretty_print函数 # 首先,我们需要创建一个新的XML树,因为不能直接修改原文件 new_tree = etree.ElementTree(root) # 格式化并写入文件 with open(file_path, 'wb') as f: new_tree.write(f, pretty_print=True, xml_declaration=True, encoding='utf-8') print("hdfs-site.xml updated") else: print(f"{file_path} does not exist") def update_mapred_site_xml(base_dir): # XML文件路径 file_path = rf'{base_dir}\etc\hadoop\mapred-site.xml' # 检查文件是否存在 if os.path.exists(file_path): # 解析XML文件 tree = ET.parse(file_path) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则创建一个新的configuration节点,并将原有内容作为其子节点 new_root = ET.Element('configuration') new_root.append(root) tree._setroot(new_root) root = new_root # 检查是否有property节点 property_node = root.find('property') if not property_node: # 创建新的property节点 property_node = ET.SubElement(root, 'property') if len(property_node) == 0: # 创建name和value节点 name_node = ET.SubElement(property_node, 'name') name_node.text = 'mapreduce.framework.name' value_node = ET.SubElement(property_node, 'value') value_node.text = 'yarn' # 将修改写回文件 tree.write(file_path, encoding='utf-8', xml_declaration=True) # 加载XML文件 parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(file_path, parser) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则可能需要更复杂的处理,这里假设它就是 raise ValueError("Expected root element to be 'configuration'") # 创建新的注释和样式表声明节点 xml_stylesheet = etree.ProcessingInstruction('xml-stylesheet', 'type="text/xsl" href="configuration.xsl"') comment = etree.Comment('\n Licensed under the Apache License, Version 2.0 (the "License");\n' ' you may not use this file except in compliance with the License.\n' ' You may obtain a copy of the License at\n' '\n' ' http://www.apache.org/licenses/LICENSE-2.0\n' '\n' ' Unless required by applicable law or agreed to in writing, software\n' ' distributed under the License is distributed on an "AS IS" BASIS,\n' ' WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n' ' See the License for the specific language governing permissions and\n' ' limitations under the License. See accompanying LICENSE file.\n') # 在configuration节点之前插入新的节点 root.addprevious(xml_stylesheet) root.addprevious(comment) # 为了保持格式化输出,我们使用lxml的pretty_print函数 # 首先,我们需要创建一个新的XML树,因为不能直接修改原文件 new_tree = etree.ElementTree(root) # 格式化并写入文件 with open(file_path, 'wb') as f: new_tree.write(f, pretty_print=True, xml_declaration=True, encoding='utf-8') print("mapred-site.xml updated") else: print(f"{file_path} does not exist") def update_yarn_site_xml(base_dir): # XML文件路径 file_path = rf'{base_dir}\etc\hadoop\yarn-site.xml' # 检查文件是否存在 if os.path.exists(file_path): # 解析XML文件 tree = ET.parse(file_path) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则创建一个新的configuration节点,并将原有内容作为其子节点 new_root = ET.Element('configuration') new_root.append(root) tree._setroot(new_root) root = new_root # 检查是否有property节点 property_node = root.find('property') if not property_node: # 创建新的property节点 property_node = ET.SubElement(root, 'property') if len(property_node) == 0: # 创建name和value节点 name_node = ET.SubElement(property_node, 'name') name_node.text = 'yarn.nodemanager.aux-services' value_node = ET.SubElement(property_node, 'value') value_node.text = 'mapreduce_shuffle' # 创建新的property节点 property_node2 = ET.SubElement(root, 'property') # 创建name和value节点 name_node = ET.SubElement(property_node2, 'name') name_node.text = 'yarn.nodemanager.aux-services.mapreduce.shuffle.class' value_node = ET.SubElement(property_node2, 'value') value_node.text = 'org.apache.hadoop.mapred.ShuffleHandler' # 创建新的property节点 property_node3 = ET.SubElement(root, 'property') # 创建name和value节点 name_node = ET.SubElement(property_node3, 'name') name_node.text = 'yarn.nodemanager.resource.memory-mb' value_node = ET.SubElement(property_node3, 'value') value_node.text = '2048' # 创建新的property节点 property_node4 = ET.SubElement(root, 'property') # 创建name和value节点 name_node = ET.SubElement(property_node4, 'name') name_node.text = 'yarn.nodemanager.resource.cpu-vcores' value_node = ET.SubElement(property_node4, 'value') value_node.text = '1' # 将修改写回文件 tree.write(file_path, encoding='utf-8', xml_declaration=True) # 加载XML文件 parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(file_path, parser) root = tree.getroot() # 检查根节点是否为configuration if root.tag != 'configuration': # 如果不是,则可能需要更复杂的处理,这里假设它就是 raise ValueError("Expected root element to be 'configuration'") # 创建新的注释和样式表声明节点 xml_stylesheet = etree.ProcessingInstruction('xml-stylesheet', 'type="text/xsl" href="configuration.xsl"') comment = etree.Comment('\n Licensed under the Apache License, Version 2.0 (the "License");\n' ' you may not use this file except in compliance with the License.\n' ' You may obtain a copy of the License at\n' '\n' ' http://www.apache.org/licenses/LICENSE-2.0\n' '\n' ' Unless required by applicable law or agreed to in writing, software\n' ' distributed under the License is distributed on an "AS IS" BASIS,\n' ' WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n' ' See the License for the specific language governing permissions and\n' ' limitations under the License. See accompanying LICENSE file.\n') # 在configuration节点之前插入新的节点 root.addprevious(xml_stylesheet) root.addprevious(comment) # 为了保持格式化输出,我们使用lxml的pretty_print函数 # 首先,我们需要创建一个新的XML树,因为不能直接修改原文件 new_tree = etree.ElementTree(root) # 格式化并写入文件 with open(file_path, 'wb') as f: new_tree.write(f, pretty_print=True, xml_declaration=True, encoding='utf-8') print("yarn-site.xml updated") else: print(f"{file_path} does not exist") def hadoop_download_decompress(base_dir): if os.path.exists(base_dir) and os.path.isdir(base_dir): print(f"Hadoop {version} downloaded & decompressed") else: print(f"Hadoop {version} started downloading") process = subprocess.run(['curl',f'https://archive.apache.org/dist/hadoop/common/{hadoop}/{hadoop}.tar.gz','-o',f'{hadoop}.tar.gz.tmp'], shell=True, check=True) os.rename(f'{hadoop}.tar.gz.tmp',f'{hadoop}.tar.gz') print(f"Hadoop {version} downloaded") process = subprocess.run(['tar','-xzvf',f'{hadoop}.tar.gz','-C',path], shell=True, check=True) if os.path.exists(base_dir) and os.path.isdir(base_dir): print(f"Hadoop {version} decompressed") def hive_download_decompress(base_dir): if os.path.exists(base_dir) and os.path.isdir(base_dir): print(f"Hive {hive_version} downloaded & decompressed") else: print(f"Hive {hive_version} started downloading") process = subprocess.run(['curl',f'http://archive.apache.org/dist/hive/hive-{hive_version}/{hive}.tar.gz','-o',f'{hive}.tar.gz.tmp'], shell=True, check=True) os.rename(f'{hive}.tar.gz.tmp',f'{hive}.tar.gz') print(f"Hive {hive_version} downloaded") process = subprocess.run(['tar','-xzvf',f'{hive}.tar.gz'], shell=True, check=True) # 移动源文件夹到目标文件夹 move(hive, path) if os.path.exists(base_dir) and os.path.isdir(base_dir): print(f"Hive {hive_version} decompressed") def update_hadoop_env_cmd(base_dir): # 批处理文件路径 file_path = rf'{base_dir}\etc\hadoop\hadoop-env.cmd' # 检查文件是否存在 if os.path.exists(file_path): java_dir = 'C:\\jre-1.8\\' if not os.path.exists(java_dir): os.system(f'xcopy /s /i "{java_path}" {java_dir}') # 设置文件路径 temp_file_path = file_path + '.tmp' # 读取文件内容 with open(file_path, 'r') as file: file_content = file.read() # 替换内容 new_content = file_content.replace('set JAVA_HOME=%JAVA_HOME%', f'set JAVA_HOME={java_dir}') # 将新内容写入到临时文件 with open(temp_file_path, 'w') as file: file.write(new_content) # 删除原始文件并将临时文件重命名为原始文件 os.remove(file_path) os.rename(temp_file_path, file_path) print("hadoop-env.cmd updated") else: print(f"{file_path} does not exist") def copy_jar_file(base_dir): source = rf'{base_dir}\share\hadoop\yarn\timelineservice\hadoop-yarn-server-timelineservice-3.1.3.jar' target = rf'{base_dir}\share\hadoop\yarn\hadoop-yarn-server-timelineservice-3.1.3.jar' if os.path.exists(source) and os.path.isfile(source): if not os.path.exists(target): # adding exception handling try: copyfile(source, target) except IOError as e: print("Unable to copy file. %s" % e) except: print("Unexpected error:", sys.exc_info()) print("'hadoop-yarn-server-timelineservice-3.1.3.jar' copy done") else: print(f"{source} does not exist") def create_hive_storage(base_dir): # 指定my_hive目录并检查是否存在,如果不存在则创建 my_hive_dir = os.path.join(base_dir, "my_hive") if not os.path.exists(my_hive_dir): os.makedirs(my_hive_dir) print("Directory my_hive is created") def mysql_download_install(base_dir): if os.path.exists(base_dir) and os.path.isdir(base_dir): print(f"MySQL {mysql_version} downloaded & installed") else: print(f"MySQL {mysql_version} started downloading") process = subprocess.run(['curl',f'https://cdn.mysql.com//Downloads/MySQLInstaller/{mysql}','-o',f'{mysql}.tmp'], shell=True, check=True) os.rename(f'{mysql}.tmp',f'{mysql}') print(f"MySQL {mysql_version} downloaded") print(f'start {mysql}') # 移动源文件夹到目标文件夹 move(hive, path) if os.path.exists(base_dir) and os.path.isdir(base_dir): print(f"MySQL {mysql_version} decompressed") def set_mysql_home(): mysql_dir = os.environ.get('MYSQL_HOME') if mysql_dir != mysql_path: process = subprocess.run(['setx','MYSQL_HOME',mysql_path], shell=True, check=True) print('MYSQL_HOME environment variable is set') path = os.environ.get('Path') if not rf'{mysql_path}\bin' in path: mysql_path_dir = rf'{path}%MYSQL_HOME%\bin' print(mysql_path_dir) # process = subprocess.run(['setx','Path', mysql_path_dir], shell=True, check=True) print('MySQL Path environment variable is set') def config_hive_xml_files(base_dir): base_path = rf"{base_dir}\conf" # 文件映射,旧文件名到新文件名 file_map = { "hive-default.xml.template": "hive-site.xml", "hive-env.sh.template": "hive-env.sh", "hive-exec-log4j2.properties.template": "hive-exec-log4j2.properties", "hive-log4j2.properties.template": "hive-log4j2.properties" } # 遍历文件映射并执行复制操作 for old_name, new_name in file_map.items(): old_path = os.path.join(base_path, old_name) new_path = os.path.join(base_path, new_name) # 检查源文件是否存在 if os.path.isfile(old_path) and (not os.path.isfile(new_path)): # 使用shutil.copy2复制文件,并保持元数据 shutil.copy2(old_path, new_path) print(f"Copied {old_name} to {new_name}") if new_name == "hive-site.xml": # 读取XML文件内容 with open(new_path, 'r', encoding='utf-8') as file: xml_content = file.read() # 使用正则表达式修复错误的HTML实体 # 假设错误实体为 '',它可能是一个编码错误,我们将其替换为一个空格 # 注意:这里只是一个示例,您需要根据实际情况调整正则表达式和替换内容 xml_content = re.sub(r'', ' ', xml_content) # 解析修复后的XML内容 try: root = ET.fromstring(xml_content) print(f"Parse {new_name} successfully") except ET.ParseError as e: print(f"Parse {new_name} failed:{e}") # 如果需要,您还可以将修复后的XML内容写回到文件中 with open(new_path, 'w', encoding='utf-8') as file: file.write(xml_content) # 加载XML文件 tree = ET.parse(new_path) root = tree.getroot() # 检查是否存在${system:java.io.tmpdir} needs_update = False for prop in root.findall('.//property'): name = prop.find('name') value = prop.find('value') if name is not None and value is not None and '${system:java.io.tmpdir}' in value.text: needs_update = True break # 如果需要更新,则进行更改 if needs_update: # 定义新的属性值 new_values = { 'hive.exec.local.scratchdir': 'C:/apache-hive-3.1.2-bin/my_hive/scratch_dir', 'hive.downloaded.resources.dir': 'C:/apache-hive-3.1.2-bin/my_hive/resources_dir/${hive.session.id}_resources', 'hive.querylog.location': 'C:/apache-hive-3.1.2-bin/my_hive/querylog_dir', 'javax.jdo.option.ConnectionURL': 'jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true', 'javax.jdo.option.ConnectionDriverName': 'com.mysql.cj.jdbc.Driver', 'javax.jdo.option.ConnectionUserName': 'root', 'javax.jdo.option.ConnectionPassword': 'root', 'hive.metastore.schema.verification': 'false', 'datanucleus.schema.autoCreateAll': 'true', 'hive.server2.active.passive.ha.enable': 'true' } # 遍历root下的所有property元素 for prop in root.findall('.//property'): name = prop.find('name') if name is not None and name.text in new_values: # 找到对应的name节点,更新其value节点的值 value = prop.find('value') if value is not None: value.text = new_values[name.text] else: # 如果value节点不存在,则创建它 value = ET.Element('value') value.text = new_values[name.text] prop.append(value) # 保存修改后的XML文件 tree.write(new_path, encoding='utf-8', xml_declaration=True) print(f"Updated {new_name} successfully") elif new_name == 'hive-env.sh': # 定义要替换的内容和目标路径 hadoop_home_old = "# HADOOP_HOME=${bin}/../../hadoop" hadoop_home_new = "export HADOOP_HOME=C:\\hadoop-3.1.3" hive_conf_dir_old = "# export HIVE_CONF_DIR=" hive_conf_dir_new = "export HIVE_CONF_DIR=C:\\apache-hive-3.1.2-bin\\conf" hive_aux_jars_path_old = "# export HIVE_AUX_JARS_PATH=" hive_aux_jars_path_new = "export HIVE_AUX_JARS_PATH=C:\\apache-hive-3.1.2-bin\\lib" file_path = "C:\\apache-hive-3.1.2-bin\\conf\\hive-env.sh" new_file_path = "C:\\apache-hive-3.1.2-bin\\conf\\hive-env-modified.sh" # 读取文件内容 with open(new_path, 'r') as file: lines = file.readlines() # 替换内容 new_lines = [] for line in lines: if hadoop_home_old in line: line = hadoop_home_new + "\n" elif hive_conf_dir_old in line: line = hive_conf_dir_new + "\n" elif hive_aux_jars_path_old in line: line = hive_aux_jars_path_new + "\n" new_lines.append(line) # 将修改后的内容写入文件 with open(new_path, 'w') as file: file.writelines(new_lines) print(f"Updated {new_name} successfully") elif new_name == 'hive-log4j2.properties': # 读取文件内容 with open(new_path, 'r', encoding='utf-8') as file: content = file.read() # 检查是否存在${sys:java.io.tmpdir} if '${sys:java.io.tmpdir}' in content: # 替换字符串 new_content = content.replace( 'property.hive.log.dir = ${sys:java.io.tmpdir}/${sys:user.name}', 'property.hive.log.dir = C:\\apache-hive-3.1.2-bin\\my_hive\\log_dir' ) # 将修改后的内容写回到一个新文件 with open(new_path, 'w', encoding='utf-8') as file: file.write(new_content) elif new_name == 'hive-log4j2.properties.template': # 读取文件内容 with open(new_path, 'r', encoding='utf-8') as file: content = file.read() # 检查是否存在${sys:java.io.tmpdir} if '${sys:java.io.tmpdir}' in content: # 替换字符串 new_content = content.replace( 'property.hive.log.dir = ${sys:java.io.tmpdir}/${sys:user.name}', 'property.hive.log.dir = C:\\apache-hive-3.1.2-bin\\my_hive\\hive_log_dir' ) # 将修改后的内容写回到一个新文件 with open(new_path, 'w', encoding='utf-8') as file: file.write(new_content) def download_mysql_connector(): if not os.path.exists(rf'{hive_path}\lib\mysql-connector-java-8.0.30.jar'): print('MySQL Connector 8.0.30 started downloading') process = subprocess.run(['curl',f'https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar','-o',f'mysql-connector-java-8.0.30.jar.tmp'], shell=True, check=True) move('mysql-connector-java-8.0.30.jar.tmp',rf'{hive_path}\lib\mysql-connector-java-8.0.30.jar') print('MySQL Connector 8.0.30 downloaded') def download_hive_cmd_files(): # 源目录和目标目录 src_dir = r".\hive-2.2.0\bin" dst_dir = rf"{hive_path}\bin" hive_cmd_path = os.path.join(dst_dir, "hive.cmd") # 检查hive.cmd是否存在 if not os.path.exists(hive_cmd_path): files_and_dirs = ['apache-hive-2.2.0-src.tar.gz.tmp', 'apache-hive-2.2.0-src.tar.gz', 'hive-2.2.0'] for item in files_and_dirs: remove_file_or_dir(item) print('Hive cmd files started downloading') process = subprocess.run(['curl','https://archive.apache.org/dist/hive/hive-2.2.0/apache-hive-2.2.0-src.tar.gz','-o',f'apache-hive-2.2.0-src.tar.gz.tmp'], shell=True, check=True) os.rename('apache-hive-2.2.0-src.tar.gz.tmp','apache-hive-2.2.0-src.tar.gz') process = subprocess.run(['tar','-xzvf','apache-hive-2.2.0-src.tar.gz'], shell=True, check=True) print('Hive cmd files downloaded') # 遍历源目录中的所有文件 for filename in os.listdir(src_dir): # 如果文件是以.cmd结尾的 if filename.endswith(".cmd"): # 构造源文件和目标文件的完整路径 src_file = os.path.join(src_dir, filename) dst_file = os.path.join(dst_dir, filename) # 复制文件 shutil.copy2(src_file, dst_file) # 遍历源目录中的所有文件 for filename in os.listdir(rf'{src_dir}\ext'): # 如果文件是以.cmd结尾的 if filename.endswith(".cmd"): # 构造源文件和目标文件的完整路径 src_file = os.path.join(rf'{src_dir}\ext', filename) dst_file = os.path.join(rf'{dst_dir}\ext', filename) # 复制文件 shutil.copy2(src_file, dst_file) # 遍历源目录中的所有文件 for filename in os.listdir(rf'{src_dir}\ext\util'): # 如果文件是以.cmd结尾的 if filename.endswith(".cmd"): # 构造源文件和目标文件的完整路径 src_file = os.path.join(rf'{src_dir}\ext\util', filename) dst_file = os.path.join(rf'{dst_dir}\ext\util', filename) # 复制文件 shutil.copy2(src_file, dst_file) print(f"Hive cmd files copied to Hive {hive_version}") def remove_file_or_dir(path): if os.path.exists(path): if os.path.isfile(path): os.remove(path) print(f"File {path} removed.") elif os.path.isdir(path): shutil.rmtree(path) print(f"Directory {path} removed.") jre_8_install() hadoop_download_decompress(directory_path) set_hadoop_home() set_java_library_path() set_hadoop_common_lib_native_dir() create_directories(directory_path) update_core_site_xml(directory_path) update_hdfs_site_xml(directory_path) update_mapred_site_xml(directory_path) update_yarn_site_xml(directory_path) update_hadoop_env_cmd(directory_path) print('hdfs namenode -format') print(f'Download apache-{hadoop}-winutils.zip from https://github.com/s911415/apache-hadoop-3.1.3-winutils or https://gitcode.com/weixin_307779131/apache-hadoop-3.1.3-winutils/tree/master or https://download.csdn.net/download/weixin_43576022/12381058') print(rf'decompress and copy the files to {directory_path}\bin and copy "hadoop.dll" and "hdfs.dll" to {directory_path}\lib\native') copy_jar_file(directory_path) print('start-all.cmd') print('http://localhost:8088/cluster') print('http://localhost:9870') hive_download_decompress(hive_path) set_hive_home() create_hive_storage(hive_path) config_hive_xml_files(hive_path) print('hadoop fs -mkdir /tmp') print('hadoop fs -mkdir /user/') print('hadoop fs -mkdir /user/hive/') print('hadoop fs -mkdir /user/hive/warehouse') print('hadoop fs -chmod g+w /tmp') print('hadoop fs -chmod g+w /user/hive/warehouse') mysql_download_install(mysql_path) set_mysql_home() download_mysql_connector() download_hive_cmd_files() print("""ALTER USER 'root'@'localhost' IDENTIFIED BY 'root'; FLUSH PRIVILEGES; CREATE DATABASE IF NOT EXISTS hive CHARACTER SET latin1 COLLATE latin1_swedish_ci; USE hive; GRANT ALL PRIVILEGES ON *.* TO 'root'@'localhost' WITH GRANT OPTION; FLUSH PRIVILEGES;""") print('hive --service schematool -dbType mysql -initSchema') print('start-all.cmd') print('hive --service metastore') print('hive --service hiveserver2') print('http://localhost:10002/') print('''hive create database test; show databases; use test; create table t (a int); insert into t values(a); select * from t;''')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。