赞
踩
在 Ubuntu 中:
sudo apt update
sudo apt install openjdk-8-jdk
在 CentOS 中:
sudo yum install java-1.8.0-openjdk
验证 Java 安装:
java -version
访问 Hadoop 官网 下载最新版本的 Hadoop。
wget https://downloads.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
tar -xzvf hadoop-3.3.1.tar.gz
mv hadoop-3.3.1 /usr/local/hadoop
编辑 ~/.bashrc
文件,添加以下内容:
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
应用更改:
source ~/.bashrc
路径:$HADOOP_HOME/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
路径:$HADOOP_HOME/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///usr/local/hadoop/hadoop_data/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///usr/local/hadoop/hadoop_data/hdfs/datanode</value>
</property>
</configuration>
复制模板文件并编辑:
cp $HADOOP_HOME/etc/hadoop/mapred-site.xml.template $HADOOP_HOME/etc/hadoop/mapred-site.xml
路径:$HADOOP_HOME/etc/hadoop/mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
路径:$HADOOP_HOME/etc/hadoop/yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
hdfs namenode -format
start-dfs.sh
start-yarn.sh
访问 Hadoop Web 界面:
在主节点上生成 SSH 密钥:
ssh-keygen -t rsa
将公钥复制到所有从节点:
ssh-copy-id user@datanode1
ssh-copy-id user@datanode2
在所有节点上安装 Hadoop,步骤与单机安装相同。
在所有节点上配置相同的 core-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode:9000</value>
</property>
</configuration>
在所有节点上配置相同的 hdfs-site.xml:
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///usr/local/hadoop/hadoop_data/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///usr/local/hadoop/hadoop_data/hdfs/datanode</value>
</property>
</configuration>
在所有节点上配置相同的 mapred-site.xml:
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
在所有节点上配置相同的 yarn-site.xml:
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
在 NameNode 上,编辑 $HADOOP_HOME/etc/hadoop/slaves
文件,添加所有 DataNode 的主机名:
datanode1
datanode2
在 NameNode 上执行:
start-dfs.sh
start-yarn.sh
访问 Namenode 和 ResourceManager 的 Web 界面,确保所有节点正常运行。
在 HDFS 中创建一个目录,并上传一个文本文件:
hdfs dfs -mkdir -p /user/hadoop/input
hdfs dfs -put localfile.txt /user/hadoop/input
以下是一个简单的 Word Count Java 程序:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] tokens = value.toString().split("\\s+"); for (String token : tokens) { word.set(token); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); File InputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
编译程序:
javac -classpath `hadoop classpath` -d wordcount_classes WordCount.java
jar -cvf wordcount.jar -C wordcount_classes/ .
运行程序:
hadoop jar wordcount.jar WordCount /user/hadoop/input /user/hadoop/output
当然可以使用 Python 来实现 Word Count 的 Hadoop MapReduce 程序。Python 提供了一个名为 Hadoop Streaming 的工具,可以通过管道方式使得我们可以使用 Python、Perl、Ruby 等语言来编写 Map 和 Reduce 函数。
下面是使用 Python 实现的 Word Count 示例。
确保你的 Hadoop 环境支持 Hadoop Streaming,可以通过以下命令查看:
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
如果没有报错,那么你的 Hadoop 支持 Streaming。
Mapper 的任务是读取输入文件的每一行,将每个单词输出为一个键值对 (word, 1)
。
#!/usr/bin/env python
import sys
# 读取标准输入
for line in sys.stdin:
# 去除前后空格并分割成单词
words = line.strip().split()
for word in words:
# 输出键值对
print(f"{word}\t1")
保存为 mapper.py
。
Reducer 的任务是汇总 Mapper 的输出,统计每个单词出现的次数。
#!/usr/bin/env python import sys current_word = None current_count = 0 word = None # 从标准输入读取数据 for line in sys.stdin: # 去除前后空格并解析输入 line = line.strip() word, count = line.split('\t', 1) # 将 count 转换为 int try: count = int(count) except ValueError: continue # 检查当前单词是否与之前的单词相同 if current_word == word: current_count += count else: if current_word: # 输出当前单词的计数 print(f"{current_word}\t{current_count}") current_word = word current_count = count # 输出最后一个单词的计数 if current_word == word: print(f"{current_word}\t{current_count}")
保存为 reducer.py
。
确保这两个 Python 脚本具有可执行权限:
chmod +x mapper.py
chmod +x reducer.py
确保 HDFS 已经运行,创建输入目录并上传数据文件:
hdfs dfs -mkdir -p /user/hadoop/input
hdfs dfs -put localfile.txt /user/hadoop/input
使用 Hadoop Streaming 工具运行 MapReduce 作业:
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar \
-input /user/hadoop/input \
-output /user/hadoop/output \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py
参数说明:
-input
:指定输入数据所在的 HDFS 目录。-output
:指定输出结果存储的 HDFS 目录。-mapper
:指定 Mapper 的执行脚本。-reducer
:指定 Reducer 的执行脚本。-file
:将本地文件发送到 Hadoop 分布式缓存中,以便在集群节点上执行。hdfs dfs -cat /user/hadoop/output/part-00000
假设 localfile.txt
内容如下:
Hello Hadoop
Hello Python
Hello World
运行以上命令后,输出结果可能如下:
Hadoop 1
Hello 3
Python 1
World 1
通过以上步骤,我们成功地使用 Python 实现了一个简单的 Hadoop Word Count 程序。Hadoop Streaming 提供了极大的灵活性,可以使用任意支持标准输入输出的编程语言来实现 MapReduce 作业。这使得开发者能够利用熟悉的编程语言进行大规模数据处理。
如果在开发过程中遇到任何问题,请确保查看 Hadoop 和 Python 的错误日志,以便更快地定位问题并进行调试。
hdfs dfs -cat /user/hadoop/output/part-r-00000
通过以上步骤,您可以成功部署 Hadoop 单机和集群环境,并运行简单的 MapReduce程序进行数据处理。在实际生产环境中,还需要根据具体需求进行进一步优化和调整。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。