当前位置:   article > 正文

Hadoop+Hive+Sqoop 离线日志分析 公会女生打招呼数据

hadoop+hive+sqoop


需求背景:

我们将女生主动和男生建立联系定义为女生打招呼,app中女生打招呼的方式有两种:主动发起文字聊天和主动发起音视频聊天。这些数据的采集通过在应用程序中增加埋点,最终成为日志文件保存在服务器上。日志内容如下:

文字聊天,日志文件 social_talklist_im_2020-06-23.log,内容示例如下:

  1. 2020-06-23 23:59:44,10.3.1.32,[8988487,9050759]
  2. 2020-06-23 23:59:47,10.3.1.32,[9016540,8946882]
  3. 2020-06-23 23:59:47,10.3.1.32,[9011059,9050680]

后两段内容含义是 发起文字聊天的女生uid和接收文字消息的男生uid。

音视频聊天,日志文件 social_talklist_video_2020-06-23.log,内容示例如下:

  1. 2020-06-23 23:59:33,10.3.1.34,["8935739",8808419]
  2. 2020-06-23 23:59:55,10.3.1.20,["9037381",9050732]

需求是,以指定公会为例,统计该公会某一天通过文字打招呼的女生数量,人均打招呼的次数,打招呼的男生数量等数据

分析:

hive的 sql on hadoop 首先需要我们有一个csv格式的文件,我们可以将他导入到hive表中,通过sql语句自动生成hadoop的mapreduce任务执行。

csv文件从那里来?我们首先需要将日志文件上传到hdfs文件系统。写一个只有map的mapreduce程序,将日志文件解析成csv文件。

此外 家族表和公会成员表存储在mysql中,我们需要将他导入到hive表中,一遍进行联表查询。

实现:

1. 将日志文件上传到hdfs文件系统

gradle文件引入依赖:`implementation "org.apache.hadoop:hadoop-client:$hadoopVersion"`

  1.  gradle installDist //打成可执行文件
  2.  cd build/install/LogAnalysic //进入执行目录
  3.  bin/LogAnalysic com.mustafa.bigdata.Upload2Hdfs /home/mustafa/Documents/bigdata/jiazu/input/social_talklist_im_2020-06-23.log bigdata/jiazu/input/im/2020-06-23.log //上传日志文件
  4.  bin/LogAnalysic com.mustafa.bigdata.Upload2Hdfs /home/mustafa/Documents/bigdata/jiazu/input/social_talklist_im_2020-06-24.log bigdata/jiazu/input/im/2020-06-24.log
  5.  bin/LogAnalysic com.mustafa.bigdata.Upload2Hdfs /home/mustafa/Documents/bigdata/jiazu/input/social_talklist_video_2020-06-23.log bigdata/jiazu/input/video/2020-06-23.log
  6.  bin/LogAnalysic com.mustafa.bigdata.Upload2Hdfs /home/mustafa/Documents/bigdata/jiazu/input/social_talklist_video_2020-06-24.log bigdata/jiazu/input/video/2020-06-24.log

 

代码参照以下链接:hdfs上传文件

2. 将日志文件解析成csv文件

我们需要将项目打成jar包,在yarn上跑mapreduce程序

gradle额外需要引入依赖:`    implementation 'com.google.code.gson:gson:2.6.2'
    implementation 'commons-lang:commons-lang:2.6'`

  1. gradle jar
  2. HADOOP_HOME=/data/home/software/hadoop-2.6.0-cdh5.16.2
  3. $HADOOP_HOME/bin/yarn jar \
  4. build/libs/LogAnalysic-1.0-SNAPSHOT.jar \
  5. com.mustafa.bigdata.ParseLog2Csv bigdata/jiazu/input/im bigdata/jiazu/output/im
  6. $HADOOP_HOME/bin/yarn jar \
  7. build/libs/LogAnalysic-1.0-SNAPSHOT.jar \
  8. com.mustafa.bigdata.ParseLog2Csv bigdata/jiazu/input/video bigdata/jiazu/output/video

代码参照以下链接:日志解析成csv文件

3. 将mysql中家族表和家族成员表导入hive中

  1. bin/sqoop import \
  2. --connect jdbc:mysql://mustafa-PC:3306/jiazu \
  3. --username root \
  4. --password 123456 \
  5. --columns jzid,jzname \
  6. --table groups \
  7. --num-mappers 1 \
  8. --mapreduce-job-name jiazu_groups_to_hive \
  9. --fields-terminated-by '\t' \
  10. --target-dir /user/mustafa/hive/tables/jiazu/groups \
  11. --delete-target-dir \
  12. --hive-import \
  13. --create-hive-table \
  14. --hive-database jiazu \
  15. --hive-overwrite \
  16. --hive-table groups
  17. bin/sqoop import \
  18. --connect jdbc:mysql://mustafa-PC:3306/jiazu \
  19. --username root \
  20. --password 123456 \
  21. --columns jzid,uid \
  22. --table member \
  23. --num-mappers 1 \
  24. --mapreduce-job-name jiazu_member_to_hive \
  25. --fields-terminated-by '\t' \
  26. --target-dir /user/mustafa/hive/tables/jiazu/member \
  27. --delete-target-dir \
  28. --hive-import \
  29. --create-hive-table \
  30. --hive-database jiazu \
  31. --hive-overwrite \
  32. --hive-table member

4. 使用hive进行离线日志的分析

主要是以下两个sql语句:

  1. select m.uid from groups g left join member m on g.jzid = m.jzid where g.jzname = 'aaa'
  2. select count(fuid) as greet_times, count(distinct fuid) as greet_users, count(distinct tuid) as disturb_users from im where fuid in ($sql1) and time >= '2020-06-23 00:00:00' and time < '2020-06-24 00:00:00'

gradle中引入依赖:`implementation "org.apache.hive:hive-jdbc:$hiveVersion"`

代码:

  1. public class LoadCsv2Table {
  2.     public static void main(String[] args) throws UnknownHostException, SQLException, ClassNotFoundException {
  3.         InetAddress addr = InetAddress.getLocalHost();
  4.         String hostname = addr.getHostName();
  5.         Class.forName("org.apache.hive.jdbc.HiveDriver");
  6.         String jdbc_url = "jdbc:hive2://" + hostname + ":10000/jiazu";
  7.         Connection conn = DriverManager.getConnection(jdbc_url,"mustafa", null);
  8.         Statement st = conn.createStatement();
  9.         st.execute("create table if not exists im (\n" +
  10.                 "    id int,\n" +
  11.                 "    fuid int,\n" +
  12.                 "    tuid int,\n" +
  13.                 "    time TIMESTAMP\n" +
  14.                 ")\n" +
  15.                 "row format delimited fields terminated by '\\t'");
  16.         st.execute("load data inpath '/user/mustafa/bigdata/jiazu/output/im' into table im");
  17.         st.execute("create external table if not exists video (\n" +
  18.                 "    id int,\n" +
  19.                 "    fuid int,\n" +
  20.                 "    tuid int,\n" +
  21.                 "    time TIMESTAMP\n" +
  22.                 ")\n" +
  23.                 "row format delimited fields terminated by '\\t'");
  24.         st.execute("load data inpath '/user/mustafa/bigdata/jiazu/output/video' into table video");
  25.         StringBuilder uidsStringBuilder = new StringBuilder("");
  26.         ResultSet resultset = st.executeQuery("select m.uid from groups g left join member m on g.jzid = m.jzid where g.jzname = 'SP'");
  27.         while(resultset.next()) {
  28.             uidsStringBuilder.append(resultset.getString("uid")).append(",");
  29.         }
  30.         String uids = uidsStringBuilder.toString().substring(0, uidsStringBuilder.toString().length()-1);
  31.         st.execute("insert overwrite local directory '/home/mustafa/Desktop/im/im-2020-06-23' row format delimited fields terminated by ',' select count(fuid) as greet_times, count(distinct fuid) as greet_users, count(distinct tuid) as disturb_users from im where fuid in (" + uids + ") and time >= '2020-06-23 00:00:00' and time < '2020-06-24 00:00:00'");
  32.         st.execute("insert overwrite local directory '/home/mustafa/Desktop/im/im-2020-06-24' row format delimited fields terminated by ',' select count(fuid) as greet_times, count(distinct fuid) as greet_users, count(distinct tuid) as disturb_users from im where fuid in (" + uids + ") and time >= '2020-06-24 00:00:00' and time < '2020-06-25 00:00:00'");
  33.         st.execute("insert overwrite local directory '/home/mustafa/Desktop/im/video-2020-06-23' row format delimited fields terminated by ',' select count(fuid) as greet_times, count(distinct fuid) as greet_users, count(distinct tuid) as disturb_users from video where fuid in (" + uids + ") and time >= '2020-06-23 00:00:00' and time < '2020-06-24 00:00:00'");
  34.         st.execute("insert overwrite local directory '/home/mustafa/Desktop/im/video-2020-06-24' row format delimited fields terminated by ',' select count(fuid) as greet_times, count(distinct fuid) as greet_users, count(distinct tuid) as disturb_users from video where fuid in (" + uids + ") and time >= '2020-06-24 00:00:00' and time < '2020-06-25 00:00:00'");
  35.         st.close();
  36.         conn.close();
  37.     }
  38. }

数据分析的结果保存在本地的 `/home/mustafa/Desktop/im`路径下

详细的代码请参照以下链接:

https://github.com/jiangliuer326442/LogAnalysic
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/787151
推荐阅读
相关标签
  

闽ICP备14008679号