赞
踩
MR | Java |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
//hadoop数据类型所在java包
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
...
//处理的数据格式
hadoop,spark,java
kudu,hadoop,hbase
zookeeper,flink,sparkstreaming
hive,flink
hadoop
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>hadoop-demo</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>ali-maven</id> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </repository> </repositories> <!-- hadoop依赖的版本需要与集群中的hadoop版本一致 --> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.2.1</version> </dependency> </dependencies> </project>
log4j.rootLogger=INFO, stdout, D # Console Appender log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern= %d{hh:mm:ss,SSS} [%t] %-5p %c %x - %m%n # Custom tweaks log4j.logger.com.codahale.metrics=WARN log4j.logger.com.ryantenney=WARN log4j.logger.com.zaxxer=WARN log4j.logger.org.apache=WARN log4j.logger.org.hibernate=WARN log4j.logger.org.hibernate.engine.internal=WARN log4j.logger.org.hibernate.validator=WARN log4j.logger.org.springframework=WARN log4j.logger.org.springframework.web=WARN log4j.logger.org.springframework.security=WARN # log file log4j.appender.D = org.apache.log4j.DailyRollingFileAppender log4j.appender.D.File = ..//log.log log4j.appender.D.Append = true log4j.appender.D.Threshold = DEBUG log4j.appender.D.layout = org.apache.log4j.PatternLayout log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
package com.csd; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ /* 继承Mapper父类,这里的四个参数<LongWritable,Text,Text,IntWritable>分别是: 1.LongWritable 是读取的文件的偏移量(暂时照着写就行,还没接触更改此参数的意义) 2.Text map阶段需要从hdfs读取的数据的数据类型,因为现在处理的是字符串,所以指定为Text 3.Text map阶段输出的key-value中的key的数据类型,本次map的输出格式为("hadoop",1)、("spark",1)....,因此指定key类型为Text 4.IntWritable map阶段输出的key-value中的value的数据类型,本次map的输出的value为1,因此指定value类型为IntWritable */ // 下面两个变量为map输出的key和value,为了避免重复实例化,因此将其提到公共变量区域 //修改hadoop数据类型统一用其set()的方法 write_key.set("hadoop") private static Text write_key = new Text(); private static IntWritable write_value = new IntWritable(); //重写map方法,此处写的是map的处理逻辑 //LongWritable offset, Text value即为Mapper<>的前两个变量 @Override protected void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException { //将hadoop的Text转换为java的String,value是输入文件的一行数据,值如value=hadoop,spark,java String line = value.toString(); //切分每一行数据 String[] words = line.split(","); //将一行数据的多个单词拆分成多个(key,value) for(String word:words){ write_key.set(word); write_value.set(1); //将(key,value)写入文件,供之后的reducer读取 context.write(write_key,write_value); } } }
package com.csd; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ private static IntWritable tmp_val = new IntWritable(); /* 继承自Reducer,Reducer<Text,IntWritable,Text,IntWritable>的变量分别是: 1.reduce端输入,即map端输出的key-value对中的key的数据类型 2.reduce端输入,map端输出的key-value对中的value的数据类型 3.reduce端输出的key-value中key的数据类型 4.reduce端输出的key-value中value的数据类型 */ //重写reduce方法,此处编写reduce处理逻辑 //Text key, Iterable<IntWritable> values 分别是: //map端输出的key的数据类型 //Iterable<IntWritable>中Iterable内部的数据类型是map端输出的value的数据类型。reducer接收到的数据不是原封不动的map端的输出,而是经过加工的,一是key值会按照字典序排序;二是相同key的value值会被放到同一个迭代器Iterable中,即reducer接收到的数据其实类是与("hadoop",[1,1,1,1])、(“spark",[1,1,1])... @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //遍历迭代器,计数每个key出现的次数 for(IntWritable val:values){ sum += val.get(); } tmp_val.set(sum); //将计数结果写入带最终的输出文件 context.write(key,tmp_val); } }
执行MR程序有多种方式
//方法一 package com.csd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; public class WordCountDriver{ //路径为本地路径 private static String input_path = "/home/dong/Desktop/a/word_count.txt"; private static String output_path = "/home/dong/Desktop/a/b"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //BasicConfigurator用于日志输出 BasicConfigurator.configure(); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(input_path)); FileOutputFormat.setOutputPath(job,new Path(output_path)); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
//方法二 package com.csd; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; public class WordCountDriver{ //路径为hdfs路径 private static String HDFS_PATH = "hdfs://dong:9000"; private static String input_path = "hdfs:///data/test";//"/home/dong/Desktop/a/word_count.txt"; private static String output_path = "hdfs:///data/result";//"/home/dong/Desktop/a/b"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { BasicConfigurator.configure(); Configuration conf = new Configuration(); //设置为hdfs模式 conf.set("fs.defaultFS", HDFS_PATH); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(input_path)); FileOutputFormat.setOutputPath(job,new Path(output_path)); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
方法三:将方法二的代码打包成jar包
点击右侧Maven --> Lifecycle --> 点击package即可完成自动打包
成功后会有信息,显示build success并给出打包完的jar路径
//jar包所在路径
[INFO] Building jar: /home/dong/code/java/hadoop/target/hadoop-demo-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
执行方式
将上述打包好的jar拷贝至集群,并通过shell方式提交到yarn上运行
//xxxx.jar即为打包好的jar路径
//com.csd.WordCountDriver 是指定main方法所在的文件
hadoop jar xxxx.jar com.csd.WordCountDriver
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。