赞
踩
本片文章需要一定的java jdbc 连接数据库的经验。
目录
错误:java.lang.UnsatisfiedLinkError
错误:java.lang.ClassCastException
HDFS的全称为Hadoop Distributed File System,是Hadoop分布式文件系统。是指被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。
HDFS能提供高吞吐量的数据访问,会对大文件进行切块,且每个切块都会备份,保证数据的高可用,适合那些有着超大数据集的应用程序。
MapReduce是一种分布式技术框架,分布式计算是将该应用分解成许多小的部分,分配给多台计算机进行处理,以达到提交计算效率的目的,减少大规模数据计算的时间。
HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式数据存储系统”。HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。
现在我们有如下json格式的数据集(条数不限):
{"车架号":"vin2364235","行驶总里程":762,"速度":0,"汽车状态":"停泊中","充电状态":"待充电","剩余电量":"93%","电量预警":"电量充足","生成时间":"2023-06-26 11:01:16"}
数据集中有若干条的重复数据(所有的字段值相同即视为重复),需要统计。
因为 Reduce阶段会将完全相同key值分为一组,所有Map阶段将整条记录作为key值,value定义为1,方便后续统计。
main方法
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS","hdfs://20210322053-master:9000");
- Job jod = Job.getInstance(conf,"repeat2");
- jod.setJarByClass(part3_2.class);
- jod.setMapperClass(part3_2.map.class);
- jod.setReducerClass(part3_2.reducer.class);
-
- jod.setMapOutputKeyClass(Text.class);
- jod.setMapOutputValueClass(IntWritable.class);
-
- jod.setOutputKeyClass(Text.class);
- jod.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.setInputPaths(jod,new Path(args[0]));
- FileOutputFormat.setOutputPath(jod,new Path(args[1]));
-
- System.exit(jod.waitForCompletion(true)?0:1);
- }
Map类
- public static class map extends Mapper<Object, Text,Text,IntWritable> {
- @Override
- protected void map(Object key, Text value,Context context) throws IOException, InterruptedException {
- context.write(value,new IntWritable(1));
- }
- }
Reduce类
在无参构造方法中定义jdbc的连接。使用Gson类将字符串的数据转换为Map字典方便后续的调用,由于文件是一行一行读取的,有可能会读取到空行,导致Gson为空。所以在输出前进行判断。
- public static class reducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- private Connection conn;
-
- public reducer() {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- String url = "jdbc:mysql://20210322053-master:3306/hadoop?serverTimezone=UTC&useSSL=false&characterEncoding=utf-8";
- conn = DriverManager.getConnection(url, "root", "123456");
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
- int total = 0;
- for(IntWritable one :values){
- total += one.get();
- }
- if (total>1) {
- Gson gson = new Gson();
- Map<String,String> data = gson.fromJson(key.toString(),Map.class);
- // 创建jdbc连接
- String sql = "insert into `repeat` values(?,?,?)";
- PreparedStatement pc = null;
- try {
- pc = conn.prepareStatement(sql);
- pc.setString(1,data.get("生成时间"));
- pc.setString(2,data.get("车架号"));
- pc.setInt(3,total);
- pc.execute();
- context.write(key,new IntWritable(total));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
-
- }
- }
- }
main
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS","hdfs://20210322053-master:9000");
- Job jod = Job.getInstance(conf,"repeat4");
- jod.setJarByClass(part3_4.class);
- jod.setMapperClass(part3_4.map .class);
- jod.setReducerClass(part3_4.reducer .class);
-
- jod.setMapOutputKeyClass(Text .class);
- jod.setMapOutputValueClass(IntWritable .class);
-
- jod.setOutputKeyClass(Text.class);
- jod.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.setInputPaths(jod,new Path(args[0]));
- FileOutputFormat.setOutputPath(jod,new Path(args[1]));
-
- System.exit(jod.waitForCompletion(true)?0:1);
- }
Map类
- public static class map extends Mapper<Object, Text,Text,IntWritable> {
- @Override
- protected void map(Object key, Text value,Context context) throws IOException, InterruptedException {
- context.write(value,new IntWritable(1));
- }
- }
Reduce类
在无参构造方法中定义Hbase数据库连接。数据列数较多时可采用遍历key值的方法将列名提取使用。
- public static class reducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- private Connection conn;
-
- public reducer() {
- try {
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum","20210322053-master:2181");
- conn = ConnectionFactory.createConnection(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
- Gson gson = new Gson();
- Map<String,String> data = gson.fromJson(key.toString(),Map.class);
- if (data!=null){
- String time = data.get("生成时间").split(" ")[0];
- Table car = conn.getTable(TableName.valueOf("car_data"));
- Put put = new Put((time+data.get("车架号")).getBytes());
- for (String x:data.keySet()) {
- String value = String.valueOf(data.get(x));
- put.addColumn("data".getBytes(),x.getBytes(),value.getBytes());
- }
- car.put(put);
- car.close();
- context.write(key,null);
- }
- }
- }
'boolean org.apache.hadoop.io.nativeio.NativeIO$Window
在C:\Windows 下添加hadoop.dll
下载windows版本的hadoop,将bin与sbin配置在PATH环境变量中。
java.lang.String cannot be cast to java.lang.Double
1
.直接使用tosting的方式
String str = entry.value().toString();
2
.使用String类的静态方法valueOf()
String str = String.valueOf(entry.value());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。