赞
踩
例如:随着大数据的不断发展,hadoop这门技术也越来越重要,很多人都开启了学习大数据之路。此次课程设计,我们采用mongodb作为存储,javaweb作为前端,echarts作为可视化工具,kettle和pandas作为数据清洗工具。使用底层mapeduce作为大数据计算。
mongodb数据库:它的特点是高性能、易部署、易使用,存储数据非常方便。主要功能特性有:*面向集合存储,易存储对象类型的数据。*模式自由。*支持动态查询。*支持完全索引,包含内部对象。*支持查询。
javaweb:个人喜好用java,其实最好可以使用node.js。
echarts:可视化工具,包含多个组件,可以说是既简单又方便的工具。
kettle:Pentaho Data Integration以Java开发,支持跨平台运行,其特性包括:支持100%无编码、拖拽方式开发ETL数据管道;可对接包括传统数据库、文件、大数据平台、接口、流数据等数据源;支持ETL数据管道加入机器学习算法。
pandas:python第三方库,对于简单数据是很好用的工具。
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
-
- /*
- 要求:对不同洲的平均温度处理--得到各大洲的平均温度
- */
- public class TemperMapper extends Mapper<LongWritable, Text,Text,DoubleWritable> {
- Text k = new Text();
- DoubleWritable v = new DoubleWritable();
-
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
- String s = value.toString();
- String[] split = s.split(",");
- String sec=split[0]+"\t"+split[6];
- if(!split[7].contains("-99")) {
- k.set(sec);
- v.set(Double.parseDouble(split[split.length - 1]));
- context.write(k, v);
- }
- }
- }

- package test.temperature.TemperatureInfo.Temperature01;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
-
- public class TemperReduce extends Reducer<Text, DoubleWritable, Text,DoubleWritable> {
-
- @Override
- protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
- int num=0;
- double sum=0;
- for (DoubleWritable val:values){
- sum+=val.get();
- num++;
- }
- Double avg= Double.valueOf(sum/num);
- context.write(key,new DoubleWritable(avg));
- }
- }

- package test.temperature.TemperatureInfo.Temperature01;
-
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapreduce.Partitioner;
-
- public class TemperPartitioner extends Partitioner<Text, DoubleWritable> {
-
- @Override
- public int getPartition(Text text, DoubleWritable doubleWritable, int p) {
- String year_coun = text.toString();
-
- int partition;
- if ((year_coun).contains("Africa")){
- partition=0;
- }
- else if ((year_coun).contains("Asia")) {
- partition = 1;
- }else if ((year_coun).contains("Australia/South Pacific")) {
- partition = 2;
- }else if ((year_coun).contains("Europe")) {
- partition = 3;
- }else if((year_coun).contains("Middle East")){
- partition=4;
- }else {
- partition=5;
- }
-
- return partition;
- }
- }

- package test.temperature.TemperatureInfo.Temperature01;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- public class TemperDriver {
-
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-
- // 1 获取配置信息以及获取job对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- // 2 关联本Driver程序的jar
- job.setJarByClass(TemperDriver.class);
-
- // 3 关联Mapper和Reducer的jar
- job.setMapperClass(TemperMapper.class);
- job.setReducerClass(TemperReduce.class);
-
- // 4 设置Mapper输出的kv类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(DoubleWritable.class);
-
- // 5 设置最终输出kv类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(DoubleWritable.class);
-
- //设置分区--分区与输出是不一样的,输出!=分区
- job.setPartitionerClass(TemperPartitioner.class);
- job.setNumReduceTasks(6);
-
- // 6 设置输入和输出路径
- FileInputFormat.setInputPaths(job, new Path("D:\\desk\\city_temperature.csv"));
- FileOutputFormat.setOutputPath(job, new Path("D:\\desk\\Temperature\\25年各大洲的平均温度"));
-
-
-
- // 7 提交job
- boolean result = job.waitForCompletion(true);
- System.exit(result ? 0 : 1);
- }
- }

- import pandas as pd
- from numpy import double
- from pyecharts import options as opts
- from pyecharts.charts import *
- from pyecharts.globals import *
-
- def zone_Temperature():
- df = pd.read_csv(r"D:\desk\Temperature\25年各大洲的平均温度\part-r-00000.csv", encoding='utf-8',header=None)
- df2 = pd.read_csv(r"D:\desk\Temperature\25年各大洲的平均温度\part-r-00001.csv", encoding='utf-8',header=None)
- df3 = pd.read_csv(r"D:\desk\Temperature\25年各大洲的平均温度\part-r-00002.csv", encoding='utf-8',header=None)
- df4 = pd.read_csv(r"D:\desk\Temperature\25年各大洲的平均温度\part-r-00003.csv", encoding='utf-8',header=None)
- df5 = pd.read_csv(r"D:\desk\Temperature\25年各大洲的平均温度\part-r-00004.csv", encoding='utf-8',header=None)
-
- dfc=df[0].str.split("\t")
- dfc2=df2[0].str.split("\t")
- dfc3=df3[0].str.split("\t")
- dfc4=df4[0].str.split("\t")
- dfc5=df5[0].str.split("\t")
-
- label=dfc[0][0]
- label2=dfc2[0][0]
- label3=dfc3[0][0]
- label4=dfc4[0][0]
- label5=dfc5[0][0]
- year=[]
- temper,temper2,temper3,temper4,temper5=[],[],[],[],[]
- for i in range(0,25):
- year.append(dfc[i][1])
- temper.append(dfc[i][2])
- temper2.append (dfc2[i][2])
- temper3.append (dfc3[i][2])
- temper4.append (dfc4[i][2])
- temper5.append (dfc5[i][2])
-
- line=(
- Line()
- .add_xaxis(xaxis_data=year)
- .add_yaxis(series_name=label,
- y_axis=temper)
- .add_yaxis (series_name=label2,
- y_axis=temper2)
- .add_yaxis (series_name=label3,
- y_axis=temper3)
- .add_yaxis (series_name=label4,
- y_axis=temper4)
- .add_yaxis (series_name=label5,
- y_axis=temper5)
- .set_global_opts(title_opts=opts.TitleOpts(title="主要大洲25年平均温度")
- ,yaxis_opts=opts.AxisOpts(min_=60))
- .set_series_opts(
- markline_opts=opts.MarkLineOpts (
- data=[
- opts.MarkPointItem(type_="average", name="平均值")
- ]
- ),
- )
- )
- return Line

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。