赞
踩
目录
主要用yothon实现:模拟生成新能源车辆数据(当然,其他语言也是一样的,思路都差不多,只是语法稍有改变),主要用于测试一些代码或一些测试步骤,当然对于像我这样的小白用来练手也是一个不错的选择。
高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失
高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点(在执行时可以动态增加节点与删除节点)
高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度
高容错性:能够自动将失败的任务重新分配
不适合低延时数据访问,不能用来做实时运算
无法高效处理大量小文件
不支持并发写入、不支持文件随机修改。
第三题:新能源车辆数据分析
1、编写一个MapReduce程序,每天凌晨2点统计前一天的新能源车辆数据是否有重复数据(所有字段都相同的两条数据则认为是重复数据),并将有重复数据的车辆信息写入到第一题的Mysql数据库的repeat表中(表必须包含字段:数据生成日期、车架号、重复次数。比如2023-01-01这一天,vin0001有两条重复数据,则mysql表中的记录则为2023-01-01、vin0001、2);
2、编写一个MapReduce程序,每天凌晨3点统计前一天上传的车辆数据总条数,并将统计结果写到第一题的Mysql数据库的total_num表中(表必须包含字段:数据生成日期、车辆数据总条数)。
3、编写一个MapReduce程序,每天凌晨4点统计前一天上传的车辆数据中非当天的数据总条数,并将统计结果写到第一题的Mysql数据库的other_date_num表中(表字段必须包含:数据生成日期、非当天的数据总条数);
4、编写一个程序,每天凌晨5点将前一天的新能源车辆明细数据写入到第一题的Hbase集群中。
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.GregorianCalendar;
- import java.util.HashMap;
-
- public class SumData {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- //1 创建一个配置对象
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS","hdfs://20210322031-master:9000");
- System.setProperty("HADOOP_USER_NAME","root");
-
- SimpleDateFormat tempDate = new SimpleDateFormat("yyyy-MM-dd");
- Calendar calendar = new GregorianCalendar();
- calendar.add(calendar.DATE, -1);
- String Timefile= tempDate.format(calendar.getTime());
- String input = "/pog/"+Timefile+"/*.json";
- String out = "/pog2/total_num/"+Timefile;
-
- Job job = Job.getInstance(conf,"json to mysql");
- // 设置jar包
- job.setJarByClass(SumData.class);
- //设置MyMapper2类,设置MyReducer2类
- job.setMapperClass(MyMapper2.class);
- job.setReducerClass(MyReducer2.class);
- //设置Text的输出的key的数据类型,设置Text的输出的value的数据类型,
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置NullWritable输出的key的数据类型,设置NullWritable的输出的value的类型
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(NullWritable.class);
- //指定要处理的文件
- FileInputFormat.addInputPath(job,new Path(input));
- //设置输出路径
- FileOutputFormat.setOutputPath(job,new Path(out));
-
- job.waitForCompletion(true);
- }
- static class MyMapper2 extends Mapper<LongWritable, Text,Text, Text> {
- @Override
- protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
- Gson gson = new Gson();
- Datas2 datas = gson.fromJson(String.valueOf(value),Datas2.class);
- String s = StringUtils.substringBefore(datas.collectTime,":");
- context.write(new Text(s),value);
-
- }
- }
- static class MyReducer2 extends Reducer<Text,Text, NullWritable,NullWritable> {
- private Connection conn;
-
- public MyReducer2() {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- String url = "jdbc:mysql://192.168.53.128:3306/pog?serverTimezone=UTC&characterEncoding=utf-8&useSSL=false";
- this.conn = DriverManager.getConnection(url,"root","Aa123-456");
- } catch (Throwable e) {
- Throwables.getStackTraceAsString(e);
- }
- }
- //
- @Override
- protected void cleanup(Reducer<Text, Text, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException {
- try {
- this.conn.close();
- } catch (Throwable e) {
- Throwables.getStackTraceAsString(e);
- }
- }
-
- @Override
- protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- // System.out.println("######MyReducer reduce....");
- int sum=0;
- HashMap<Text, Integer> map = new HashMap<>();
- for (Text val : values) {
- if (map.containsKey(val)) {
- map.put(val, map.get(val) + 1);
- sum+=1;
- } else {
- map.put(val, 1);
- sum+=1;
- }
- }
- //total_num是MYSQL中的表名,values(?,?)匹配key键sum键
-
- String sql = "insert into `total_num` values(?,?)";
- try {
- PreparedStatement preparedStatement = this.conn.prepareStatement(sql);
- preparedStatement.setString(1, String.valueOf(key));
- preparedStatement.setInt(2, sum);
- preparedStatement.execute();
- } catch (Throwable e) {
- Throwables.getStackTraceAsString(e);
- }
- }
- }
- //collectTime和vin是表中的字段名可以根据需求添加
- public static class Datas2 {
- String collectTime;
- String vin;
-
- public Datas2(String collectTime, String vin) {
- this.collectTime = collectTime;
- this.vin = vin;
- }
- }
- }
代码运行效果
将需要统计的文件上传到hdfs的/pog/"+Timefile+"/*.json中,Timefile的是自定义的可以取为2023-06-08,后缀名必须是json类型。
最后在MYSQL查看total_num表的统计内容
编写一个MdaapReduce程序,每天凌晨3点统计前一天上传的车辆数据总条数,并将统计结果写到Mysql数据库的total_num表中(表必须包含字段:数据生成日期、车辆数据总条数)中出现了很多小问题比如应该怎么把数据存储起来统计,比如怎么把数据进行统计等等,最后通过一段时间的研究终于把问题解决了。整体流程就是把需要统计的数据传到hdfs上,然后连接hdfs读取hdfs上需要统计的文件进行统计,最后把统计的结果传入到mysql中。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。