当前位置:   article > 正文

购物平台商品实时推荐系统(五)

商品实时推荐系统
  1. 一、使用hive load hdfs上的清洗的数据。 4861 + 9666
  2. --------------------------------------------- ----------------------
  3. 1.动态添加表分区
  4. $hive> alter table eshop.logs add partition(year=2018,month=11,day=25,hour=12,minute=51);
  5. 2.load数据到表中。
  6. $hive> load data inpath '/data/eshop/cleaned/2018/11/25/12/58' into table eshop.logs partition(year=2018,month=11,day=25,hour=12,minute=51);
  7. 3.查询topN
  8. $hive> select * from logs ;
  9. //倒排序topN
  10. $hive> select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;
  11. 4.创建统计结果表
  12. $hive> create table stats(request string, c int) row format DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
  13. $hive> insert into stats select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;
  14. 5.Mysql中创建表
  15. mysql> create table stats (id int primary key auto_increment,request varchar(200), c int);
  16. 5.使用sqoop将hive中的数据导出到mysql
  17. $>sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table stats --columns request,c --export-dir hdfs://s100/user/hive/warehouse/eshop.db/stats
  18. 6.将以上2-5部写成脚本,使用cron进行调度.
  19. a.描述
  20. 每天的凌晨2点整,统计昨天的日志。
  21. b.创建bash脚本
  22. 1.创建准备脚本 -- 动态创建hivesql脚本文件[stat.ql]。
  23. [/usr/local/bin/prestats.sh]
  24. #!/bin/bash
  25. y=`date +%Y`
  26. m=`date +%m`
  27. d=`date -d "-0 day" +%d`
  28. m=$(( m+0 ))
  29. d=$(( d+0 ))
  30. # 删除之前的hql文件
  31. rm -rf stat.ql
  32. #添加分区
  33. echo "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql
  34. #加载数据放到分区
  35. echo "load data inpath 'hdfs://s201/user/centos/eshop/cleaned/${y}/${m}/${d}/9/28' into table eshop.logs partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql
  36. #统计数据,并将结果插入到stats表
  37. echo "insert into eshop.stats select request,count(*) as c from eshop.logs where year = ${y} and month = ${m} and day = ${d} and hour=9 and minute = 28 group by request order by c desc ;" >> stat.ql
  38. 2.创建执行脚本
  39. [/usr/local/bin/exestats.sh]
  40. #!/bin/bash
  41. # 创建hive脚本文件
  42. ./prestats.sh
  43. #执行hive的ql脚本
  44. hive -f stat.ql
  45. #执行sqoop导出到mysql
  46. sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --columns request,c --export-dir /user/hive/warehouse/eshop.db/stats
  47. #sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --export-dir /user/hive/warehouse/eshop.db/stats
  48. 3.修改所有权限
  49. $>sudo chmod a+x /usr/local/bin/prestats.sh
  50. $>sudo chmod a+x /usr/local/bin/exestats.sh
  51. 7.编写java客户端进行以上步骤
  52. a.在hive主机上启动hiveserver2
  53. $> hiveserver2 &
  54. b.编写java客户端通过jdbc访问hive数据
  55. 1)新建HiveClient模块,添加maven支持
  56. <?xml version="1.0" encoding="UTF-8"?>
  57. <project xmlns="http://maven.apache.org/POM/4.0.0"
  58. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  59. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  60. <modelVersion>4.0.0</modelVersion>
  61. <groupId>com.test</groupId>
  62. <artifactId>HiveClient</artifactId>
  63. <version>1.0-SNAPSHOT</version>
  64. <dependencies>
  65. <dependency>
  66. <groupId>org.apache.hive</groupId>
  67. <artifactId>hive-jdbc</artifactId>
  68. <version>2.1.0</version>
  69. </dependency>
  70. <dependency>
  71. <groupId>mysql</groupId>
  72. <artifactId>mysql-connector-java</artifactId>
  73. <version>5.1.17</version>
  74. </dependency>
  75. </dependencies>
  76. </project>
  77. 2)编写类进行查询和插入StatDao.java
  78. package com.test.hiveclient;
  79. import org.apache.hadoop.hbase.client.Result;
  80. import java.sql.*;
  81. import java.util.HashMap;
  82. import java.util.Map;
  83. public class StatDao {
  84. private static Map<String, Integer> map = new HashMap<String, Integer>();
  85. public static void main(String [] args)
  86. {
  87. try {
  88. Class.forName("org.apache.hive.jdbc.HiveDriver");
  89. //建立连接
  90. Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.43.131:10000/eshop","","");
  91. System.out.println(conn);
  92. PreparedStatement ppst = conn.prepareStatement("select * from stats");
  93. ResultSet set = ppst.executeQuery();
  94. while (set.next()) {
  95. map.put(set.getString(1), set.getInt(2));
  96. System.out.print(set.getString(1) + " : ");
  97. System.out.print(set.getInt(2));
  98. System.out.println();
  99. }
  100. getMysqlConn();
  101. } catch (Exception e) {
  102. e.printStackTrace();
  103. }
  104. }
  105. public static void getMysqlConn()
  106. {
  107. try {
  108. //加载类(加载驱动程序)
  109. Class.forName("com.mysql.jdbc.Driver");
  110. //数据库连接url
  111. String url = "jdbc:mysql://192.168.43.1:3306/eshop" ;
  112. //username
  113. String user = "mysql";
  114. //password
  115. String pass = "mysql" ;
  116. //得到连接
  117. Connection conn = DriverManager.getConnection(url, user, pass);
  118. //创建语句对象
  119. Statement st = conn.createStatement();
  120. for(String key : map.keySet() )
  121. {
  122. PreparedStatement pt = conn.prepareStatement("insert into stats (request,count) values(?,?)");
  123. pt.setString(1, key);
  124. pt.setInt(2, map.get(key));
  125. pt.executeUpdate();
  126. }
  127. } catch (Exception e) {
  128. e.printStackTrace();
  129. }
  130. }
  131. }
  132. 二、JFreeChart生成统计图表
  133. ----------------------------------------------------------
  134. 1.pom.xml
  135. <dependency>
  136. <groupId>jfree</groupId>
  137. <artifactId>jfreechart</artifactId>
  138. <version>1.0.13</version>
  139. </dependency>
  140. 2.使用JFreechart生成图片
  141. package com.test.eshop.test;
  142. import org.jfree.chart.ChartFactory;
  143. import org.jfree.chart.ChartUtilities;
  144. import org.jfree.chart.JFreeChart;
  145. import org.jfree.chart.plot.PiePlot;
  146. import org.jfree.chart.plot.PiePlot3D;
  147. import org.jfree.data.general.DefaultPieDataset;
  148. import org.jfree.data.general.PieDataset;
  149. import org.junit.Test;
  150. import java.awt.*;
  151. import java.io.File;
  152. import java.io.IOException;
  153. /**
  154. * 测试饼图
  155. */
  156. public class TestJfreechart {
  157. @Test
  158. public void pie() throws Exception {
  159. File f = new File("d:/pie.png");
  160. //数据集
  161. DefaultPieDataset ds = new DefaultPieDataset();
  162. ds.setValue("HuaWei",3000);
  163. ds.setValue("Apple",5000);
  164. ds.setValue("Mi",1890);
  165. JFreeChart chart = ChartFactory.createPieChart("饼图演示", ds, false, false, false);
  166. Font font = new Font("宋体",Font.BOLD,15);
  167. chart.getTitle().setFont(font);
  168. //背景透明
  169. ((PiePlot)chart.getPlot()).setForegroundAlpha(0.2f);
  170. ((PiePlot)chart.getPlot()).setExplodePercent("Apple",0.1f);
  171. ((PiePlot)chart.getPlot()).setExplodePercent("HuaWei",0.2f);
  172. ((PiePlot)chart.getPlot()).setExplodePercent("Mi",0.3f);
  173. //创建3D饼图
  174. ChartUtilities.saveChartAsJPEG(f, chart,400,300);
  175. }
  176. }
  177. 三、引入Spark推荐系统
  178. --------------------------------------------------------------------
  179. 1.设计用户商品表 -- Mysql
  180. create table useritems(id int primary key auto_increment ,userid int, itemid int, score int , time timestamp);
  181. 2.添加映射文件UserItem.hbm.xml
  182. 3.Dao + Service
  183. 4.controller
  184. 5.spark部分
  185. a)通过sqoop到处mysql数据到hdfs
  186. $> sqoop import --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table useritems --columns userid,itemid,score -m 2 --target-dir /data/eshop/recommends --check-column id --incremental append --last-value 0
  187. b)启动spark集群
  188. c)启动spark-shell
  189. $> spark-shell --master spark://s100:7077
  190. #内置SparkSession--spark
  191. $scala>
  192. import org.apache.spark.ml.evaluation.RegressionEvaluator
  193. import org.apache.spark.ml.recommendation.ALS
  194. import spark.implicits._
  195. case class UserItem(userId: Int, itemId: Int, score : Int);
  196. def parseRating(str: String): UserItem = {
  197. val fields = str.split(",")
  198. UserItem(fields(0).toInt, fields(1).toInt, fields(2).toInt)
  199. }
  200. val useritems = spark.read.textFile("hdfs://s100/data/eshop/recommends").map(parseRating).toDF()
  201. //val test = spark.read.textFile("hdfs://s100/data/eshop/testdata.txt").map(parseRating).toDF()
  202. val Array(training, test) = useritems.randomSplit(Array(0.8, 0.2))
  203. val als = new ALS()
  204. .setMaxIter(5)
  205. .setRegParam(0.01)
  206. .setUserCol("userId")
  207. .setItemCol("itemId")
  208. .setRatingCol("score")
  209. val model = als.fit(training)
  210. val predictions = model.transform(test)
  211. val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("score").setPredictionCol("prediction")
  212. val rmse = evaluator.evaluate(predictions)
  213. println(s"Root-mean-square error = $rmse")
  214. //保存ALS模型
  215. model.save("hdfs://s100/data/eshop/rec/model");
  216. spark.stop()
  217. //加载模型
  218. import org.apache.spark.ml.recommendation.ALSModel;
  219. val model = ALSModel.load("hdfs://s201/user/centos/eshop/rec/model");
  220. val predictions = model.transform(test)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/301047
推荐阅读
相关标签
  

闽ICP备14008679号