赞
踩
- 一、使用hive load hdfs上的清洗的数据。 4861 + 9666
- --------------------------------------------- ----------------------
- 1.动态添加表分区
- $hive> alter table eshop.logs add partition(year=2018,month=11,day=25,hour=12,minute=51);
-
- 2.load数据到表中。
- $hive> load data inpath '/data/eshop/cleaned/2018/11/25/12/58' into table eshop.logs partition(year=2018,month=11,day=25,hour=12,minute=51);
-
- 3.查询topN
- $hive> select * from logs ;
- //倒排序topN
- $hive> select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;
-
- 4.创建统计结果表
- $hive> create table stats(request string, c int) row format DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
- $hive> insert into stats select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;
-
- 5.Mysql中创建表
- mysql> create table stats (id int primary key auto_increment,request varchar(200), c int);
-
- 5.使用sqoop将hive中的数据导出到mysql
- $>sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table stats --columns request,c --export-dir hdfs://s100/user/hive/warehouse/eshop.db/stats
-
- 6.将以上2-5部写成脚本,使用cron进行调度.
- a.描述
- 每天的凌晨2点整,统计昨天的日志。
-
- b.创建bash脚本
- 1.创建准备脚本 -- 动态创建hivesql脚本文件[stat.ql]。
- [/usr/local/bin/prestats.sh]
- #!/bin/bash
- y=`date +%Y`
- m=`date +%m`
- d=`date -d "-0 day" +%d`
-
- m=$(( m+0 ))
- d=$(( d+0 ))
- # 删除之前的hql文件
- rm -rf stat.ql
-
- #添加分区
- echo "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql
-
- #加载数据放到分区
- echo "load data inpath 'hdfs://s201/user/centos/eshop/cleaned/${y}/${m}/${d}/9/28' into table eshop.logs partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql
-
- #统计数据,并将结果插入到stats表
- echo "insert into eshop.stats select request,count(*) as c from eshop.logs where year = ${y} and month = ${m} and day = ${d} and hour=9 and minute = 28 group by request order by c desc ;" >> stat.ql
-
- 2.创建执行脚本
- [/usr/local/bin/exestats.sh]
- #!/bin/bash
- # 创建hive脚本文件
- ./prestats.sh
-
- #执行hive的ql脚本
- hive -f stat.ql
-
- #执行sqoop导出到mysql
- sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --columns request,c --export-dir /user/hive/warehouse/eshop.db/stats
- #sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --export-dir /user/hive/warehouse/eshop.db/stats
-
- 3.修改所有权限
- $>sudo chmod a+x /usr/local/bin/prestats.sh
- $>sudo chmod a+x /usr/local/bin/exestats.sh
-
- 7.编写java客户端进行以上步骤
- a.在hive主机上启动hiveserver2
- $> hiveserver2 &
-
- b.编写java客户端通过jdbc访问hive数据
- 1)新建HiveClient模块,添加maven支持
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.test</groupId>
- <artifactId>HiveClient</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-jdbc</artifactId>
- <version>2.1.0</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.17</version>
- </dependency>
-
- </dependencies>
-
-
- </project>
- 2)编写类进行查询和插入StatDao.java
- package com.test.hiveclient;
-
- import org.apache.hadoop.hbase.client.Result;
-
- import java.sql.*;
- import java.util.HashMap;
- import java.util.Map;
-
- public class StatDao {
-
- private static Map<String, Integer> map = new HashMap<String, Integer>();
-
- public static void main(String [] args)
- {
- try {
- Class.forName("org.apache.hive.jdbc.HiveDriver");
- //建立连接
- Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.43.131:10000/eshop","","");
- System.out.println(conn);
- PreparedStatement ppst = conn.prepareStatement("select * from stats");
- ResultSet set = ppst.executeQuery();
- while (set.next()) {
- map.put(set.getString(1), set.getInt(2));
- System.out.print(set.getString(1) + " : ");
- System.out.print(set.getInt(2));
- System.out.println();
- }
-
-
- getMysqlConn();
-
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- public static void getMysqlConn()
- {
- try {
- //加载类(加载驱动程序)
- Class.forName("com.mysql.jdbc.Driver");
- //数据库连接url
- String url = "jdbc:mysql://192.168.43.1:3306/eshop" ;
- //username
- String user = "mysql";
- //password
- String pass = "mysql" ;
-
- //得到连接
- Connection conn = DriverManager.getConnection(url, user, pass);
- //创建语句对象
- Statement st = conn.createStatement();
- for(String key : map.keySet() )
- {
- PreparedStatement pt = conn.prepareStatement("insert into stats (request,count) values(?,?)");
- pt.setString(1, key);
- pt.setInt(2, map.get(key));
- pt.executeUpdate();
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
-
- 二、JFreeChart生成统计图表
- ----------------------------------------------------------
- 1.pom.xml
- <dependency>
- <groupId>jfree</groupId>
- <artifactId>jfreechart</artifactId>
- <version>1.0.13</version>
- </dependency>
-
- 2.使用JFreechart生成图片
- package com.test.eshop.test;
-
- import org.jfree.chart.ChartFactory;
- import org.jfree.chart.ChartUtilities;
- import org.jfree.chart.JFreeChart;
- import org.jfree.chart.plot.PiePlot;
- import org.jfree.chart.plot.PiePlot3D;
- import org.jfree.data.general.DefaultPieDataset;
- import org.jfree.data.general.PieDataset;
- import org.junit.Test;
-
- import java.awt.*;
- import java.io.File;
- import java.io.IOException;
-
- /**
- * 测试饼图
- */
- public class TestJfreechart {
-
- @Test
- public void pie() throws Exception {
- File f = new File("d:/pie.png");
-
- //数据集
- DefaultPieDataset ds = new DefaultPieDataset();
- ds.setValue("HuaWei",3000);
- ds.setValue("Apple",5000);
- ds.setValue("Mi",1890);
-
- JFreeChart chart = ChartFactory.createPieChart("饼图演示", ds, false, false, false);
-
- Font font = new Font("宋体",Font.BOLD,15);
- chart.getTitle().setFont(font);
- //背景透明
-
- ((PiePlot)chart.getPlot()).setForegroundAlpha(0.2f);
- ((PiePlot)chart.getPlot()).setExplodePercent("Apple",0.1f);
- ((PiePlot)chart.getPlot()).setExplodePercent("HuaWei",0.2f);
- ((PiePlot)chart.getPlot()).setExplodePercent("Mi",0.3f);
-
-
- //创建3D饼图
- ChartUtilities.saveChartAsJPEG(f, chart,400,300);
- }
- }
-
-
- 三、引入Spark推荐系统
- --------------------------------------------------------------------
- 1.设计用户商品表 -- Mysql
- create table useritems(id int primary key auto_increment ,userid int, itemid int, score int , time timestamp);
-
- 2.添加映射文件UserItem.hbm.xml
-
- 3.Dao + Service
-
- 4.controller
-
- 5.spark部分
- a)通过sqoop到处mysql数据到hdfs
- $> sqoop import --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table useritems --columns userid,itemid,score -m 2 --target-dir /data/eshop/recommends --check-column id --incremental append --last-value 0
-
- b)启动spark集群
-
- c)启动spark-shell
- $> spark-shell --master spark://s100:7077
-
- #内置SparkSession--spark
- $scala>
- import org.apache.spark.ml.evaluation.RegressionEvaluator
- import org.apache.spark.ml.recommendation.ALS
- import spark.implicits._
-
- case class UserItem(userId: Int, itemId: Int, score : Int);
-
- def parseRating(str: String): UserItem = {
- val fields = str.split(",")
- UserItem(fields(0).toInt, fields(1).toInt, fields(2).toInt)
- }
-
- val useritems = spark.read.textFile("hdfs://s100/data/eshop/recommends").map(parseRating).toDF()
-
- //val test = spark.read.textFile("hdfs://s100/data/eshop/testdata.txt").map(parseRating).toDF()
- val Array(training, test) = useritems.randomSplit(Array(0.8, 0.2))
-
- val als = new ALS()
- .setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("itemId")
- .setRatingCol("score")
- val model = als.fit(training)
-
- val predictions = model.transform(test)
-
- val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("score").setPredictionCol("prediction")
- val rmse = evaluator.evaluate(predictions)
- println(s"Root-mean-square error = $rmse")
-
-
- //保存ALS模型
- model.save("hdfs://s100/data/eshop/rec/model");
- spark.stop()
-
-
- //加载模型
- import org.apache.spark.ml.recommendation.ALSModel;
- val model = ALSModel.load("hdfs://s201/user/centos/eshop/rec/model");
- val predictions = model.transform(test)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。