当前位置:   article > 正文

大数据离线数仓开发项目详细教程_大数据离线数仓项目

大数据离线数仓项目

目录

一、数据准备/opt/eventdata

二、环境准备

三、创建Kafka的topic主题

四、使用Flume将文件采集到Kafka

(一)在/opt/soft/flume190/conf/目录下创建events文件夹,并创建5个conf文件

(二)events.conf

(三)ea.conf

1.ea.conf脚本内容:

2开启flume

3.复制文件到指定目录

4.查看event_attendees_raw主题的消息数量

(四)users.conf

1.users.conf脚本内容:

2.开启flume

3.复制文件到指定目录

4.查看users主题的消息数量

(五)uf.conf

1.uf.conf脚本内容:

2.开启flume

3.复制文件到指定目录

4.查看user_friends_raw主题的消息数量

(六)train.conf

1.train.conf脚本内容:

2.开启flume

3.复制文件到指定目录

4.查看train主题的消息数量

五、使用java对Kafka中的消息进行清洗,传入Kafka中

六、将Kafka清洗后的数据存入HBase中

七、建立Hive表,映射HBase的数据

(一)DWD层

1.dwd_events库

2.users表

3.events表

4.user_friend表

5.event_attendee表

6.train表

7.locale表

8.time_zone表

9.dwd_events中的表

(二)DWS层——Hive实现

1.dws_events库

2.user_friend_count表——每个用户的朋友的数量

3.event_attendee_count表——每个事件发生后的出席情况

4.event_user_state表——每个事件邀请的朋友的应邀情况

5.user_event_status表——event_user_state表的数据梳理

6.user_attend_event_count表——查看每个用户分别对事件的邀请参加、不参加的数量的统计

7.friend_attend_state表——统计应邀情况

8.friend_attend_summary表——求用户朋友中针对某一件事情的状态数量 多少人被邀请,多少人参加,多少人不参加,多少人可能去参加

9.event_cities表——找出事件发生最多的前32个的城市

10.event_countries表——找出事件发生最多的前8个国家

11.dws层的表

(三)DWS层——spark实现

1.环境搭建

2.user_friend_count表——每个用户的朋友的数量

3.event_attendee_count表———每个用户的朋友的数量

4.event_user_state表——每个事件邀请的朋友的应邀情况

5.user_event_status表——event_user_state表的数据梳理

6.user_attend_event_count表——查看每个用户分别对事件的邀请参加、不参加的数量的统计

7.friend_attend_state表——统计应邀情况

8.friend_attend_summary表——求用户朋友中针对某一件事情的状态数量 多少人被邀请,多少人参加,多少人不参加,多少人可能去参加

9.event_cities表——找出事件发生最多的前32个的城市

10.event_countries表——找出事件发生最多的前8个国家

11.验证是否存入成功

(四)DM层——宽表明细层

1.dm_events库

2.user_event_1表——每个事件发生的国家、城市等信息


一、数据准备/opt/eventdata

二、环境准备

开启hadoop、hive、mysql、zookeeper、kafka、hbase

三、创建Kafka的topic主题

  1. ➢ Users
  2. kafka-topics.sh --zookeeper lxm147:2181 --create --topic users --partitions 1 -replication-factor 1
  3. ➢ User_Friends
  4. kafka-topics.sh --zookeeper lxm147:2181 --create --topic user_friends --partitions 1 -replication-factor 1
  5. ➢ user_friends_raw
  6. kafka-topics.sh --zookeeper lxm147:2181 --create --topic user_friends_raw --partitions 1 --replication-factor 1
  7. ➢ Events
  8. kafka-topics.sh --zookeeper lxm147:2181 --create --topic events --partitions 1 --replication-factor 1
  9. ➢ Event_Attendees
  10. kafka-topics.sh --zookeeper lxm147:2181 --create --topic event_attendees --partitions 1 --replication-factor 1
  11. ➢ event_attendees_raw
  12. kafka-topics.sh --zookeeper lxm147:2181 --create --topic event_attendees_raw --partitions 1 --replication-factor 1
  13. ➢ Train
  14. kafka-topics.sh --zookeeper lxm147:2181 --create --topic train --partitions 1 -replication-factor 1
  15. Test
  16. kafka-topics.sh --zookeeper lxm147:2181 --create --topic test --partitions 1 --replication-factor 1

四、使用Flume将文件采集到Kafka

(一)在/opt/soft/flume190/conf/目录下创建events文件夹,并创建5个conf文件

  1. [root@lxm147 events]# pwd
  2. /opt/soft/flume190/conf/events
  3. [root@lxm147 events]# ls
  4. ea.conf events.conf train.conf uf.conf users.conf

(二)events.conf

参考文章:《Flume采集数据到Kafka操作详解》

后面的采集任务与该篇博文中的方法是一样的!

(三)ea.conf

1.ea.conf脚本内容:

  1. ea.sources=eaSource
  2. ea.channels=eaChannel
  3. ea.sinks=eaSink
  4. ea.sources.eaSource.type=spooldir
  5. ea.sources.eaSource.spoolDir=/opt/flumelogfile/ea
  6. ea.sources.eaSource.deserializer=LINE
  7. ea.sources.eaSource.deserializer.maxLineLength=320000
  8. ea.sources.eaSource.includePattern=ea_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
  9. ea.sources.eaSource.interceptors=head_filter
  10. ea.sources.eaSource.interceptors.head_filter.type=regex_filter
  11. ea.sources.eaSource.interceptors.head_filter.regex=^event*
  12. ea.sources.eaSource.interceptors.head_filter.excludeEvents=true
  13. ea.channels.eaChannel.type=file
  14. ea.channels.eaChannel.checkpointDir=/opt/flumelogfile/checkpoint/ea
  15. ea.channels.eaChannel.dataDirs=/opt/flumelogfile/data/ea
  16. ea.sinks.eaSink.type=org.apache.flume.sink.kafka.KafkaSink
  17. ea.sinks.eaSink.batchSize=640
  18. ea.sinks.eaSink.brokerList=LINE
  19. ea.sinks.eaSink.brokerList=192.168.180.147:9092
  20. ea.sinks.eaSink.topic=event_attendees_raw
  21. ea.sources.eaSource.channels=eaChannel
  22. ea.sinks.eaSink.channel=eaChannel

2开启flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name ea --conf ./conf/ --conf-file ./conf/events/ea.conf -Dflume.root.logger=INFO,console

3.复制文件到指定目录

cp /opt/eventdata/event_attendees.csv /opt/flumelogfile/events/ea_2023-04-01.csv

4.查看event_attendees_raw主题的消息数量

  1. kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic event_attendees_raw
  2. event_attendees_raw:0:24144

(四)users.conf

1.users.conf脚本内容:

  1. users.sources=usersSource
  2. users.channels=usersChannel
  3. users.sinks=userSink
  4. users.sources.usersSource.type=spooldir
  5. users.sources.usersSource.spoolDir=/opt/flumelogfile/users
  6. users.sources.usersSource.deserializer=LINE
  7. users.sources.usersSource.deserializer.maxLineLength=320000
  8. users.sources.usersSource.includePattern=user_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
  9. users.sources.usersSource.interceptors=head_filter
  10. users.sources.usersSource.interceptors.head_filter.type=regex_filter
  11. users.sources.usersSource.interceptors.head_filter.regex=^user_id*
  12. users.sources.usersSource.interceptors.head_filter.excludeEvents=true
  13. users.channels.usersChannel.type=file
  14. users.channels.usersChannel.checkpointDir=/opt/flumelogfile/checkpoint/users
  15. users.channels.usersChannel.dataDirs=/opt/flumelogfile/data/users
  16. users.sinks.userSink.type=org.apache.flume.sink.kafka.KafkaSink
  17. users.sinks.userSink.batchSize=640
  18. users.sinks.userSink.brokerList=192.168.180.147:9092
  19. users.sinks.userSink.topic=users
  20. users.sources.usersSource.channels=usersChannel
  21. users.sinks.userSink.channel=usersChannel

2.开启flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name users --conf ./conf/ --conf-file ./conf/events/users.conf -Dflume.root.logger=INFO,console

3.复制文件到指定目录

cp /opt/eventdata/users.csv /opt/flumelogfile/events/users_2023-04-01.csv

4.查看users主题的消息数量

  1. [root@lxm147 events]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic users
  2. users:0:38209

(五)uf.conf

1.uf.conf脚本内容:

  1. userfriends.sources=userfriendsSource
  2. userfriends.channels=userfriendsChannel
  3. userfriends.sinks=userfriendsSink
  4. userfriends.sources.userfriendsSource.type=spooldir
  5. userfriends.sources.userfriendsSource.spoolDir=/opt/flumelogfile/uf
  6. userfriends.sources.userfriendsSource.deserializer=LINE
  7. userfriends.sources.userfriendsSource.deserializer.maxLineLength=320000
  8. userfriends.sources.userfriendsSource.includePattern=uf_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
  9. userfriends.sources.userfriendsSource.interceptors=head_filter
  10. userfriends.sources.userfriendsSource.interceptors.head_filter.type=regex_filter
  11. userfriends.sources.userfriendsSource.interceptors.head_filter.regex=^user*
  12. userfriends.sources.userfriendsSource.interceptors.head_filter.excludeEvents=true
  13. userfriends.channels.userfriendsChannel.type=file
  14. userfriends.channels.userfriendsChannel.checkpointDir=/opt/flumelogfile/checkpoint/uf
  15. userfriends.channels.userfriendsChannel.dataDirs=/opt/flumelogfile/data/uf
  16. userfriends.sinks.userfriendsSink.type=org.apache.flume.sink.kafka.KafkaSink
  17. userfriends.sinks.userfriendsSink.batchSize=640
  18. userfriends.sinks.userfriendsSink.brokerList=192.168.180.147:9092
  19. userfriends.sinks.userfriendsSink.topic=user_friends_raw
  20. userfriends.sources.userfriendsSource.channels=userfriendsChannel
  21. userfriends.sinks.userfriendsSink.channel=userfriendsChannel

2.开启flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name userfriends --conf ./conf/ --conf-file ./conf/events/uf.conf -Dflume.root.logger=INFO,console

3.复制文件到指定目录

cp /opt/eventdata/user_friends.csv /opt/flumelogfile/events/uf_2023-04-01.csv

4.查看user_friends_raw主题的消息数量

  1. [root@lxm147 flumelogfile]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic user_friends_raw
  2. user_friends_raw:0:38202

(六)train.conf

1.train.conf脚本内容:

  1. train.sources=trainSource
  2. train.channels=trainChannel
  3. train.sinks=trainSink
  4. train.sources.trainSource.type=spooldir
  5. train.sources.trainSource.spoolDir=/opt/flumelogfile/train
  6. train.sources.trainSource.deserializer=LINE
  7. train.sources.trainSource.deserializer.maxLineLength=320000
  8. train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
  9. train.sources.trainSource.interceptors=head_filter
  10. train.sources.trainSource.interceptors.head_filter.type=regex_filter
  11. train.sources.trainSource.interceptors.head_filter.regex=^user*
  12. train.sources.trainSource.interceptors.head_filter.excludeEvents=true
  13. train.channels.trainChannel.type=file
  14. train.channels.trainChannel.checkpointDir=/opt/flumelogfile/checkpoint/train
  15. train.channels.trainChannel.dataDirs=/opt/flumelogfile/data/train
  16. train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
  17. train.sinks.trainSink.batchSize=640
  18. train.sinks.trainSink.brokerList=LINE
  19. train.sinks.trainSink.brokerList=192.168.180.147:9092
  20. train.sinks.trainSink.topic=train
  21. train.sources.trainSource.channels=trainChannel
  22. train.sinks.trainSink.channel=trainChannel

2.开启flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name train --conf ./conf/ --conf-file ./conf/events/train.conf -Dflume.root.logger=INFO,console

3.复制文件到指定目录

cp /opt/eventdata/train.csv /opt/flumelogfile/events/train_2023-04-01.csv

4.查看train主题的消息数量

  1. [root@lxm147 flumelogfile]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic train
  2. train:0:15398

五、使用java对Kafka中的消息进行清洗,传入Kafka中

参考博文《KafkaStream——Spark对Kafka的数据进行清洗(java语言编写)》

六、将Kafka清洗后的数据存入HBase中

参考博文《日志项目之——将kafka数据存入hbase中

七、建立Hive表,映射HBase的数据

(一)DWD层

1.dwd_events库

  1. create database if not exists dwd_events;
  2. use dwd_events;
  3. // 开启动态分区
  4. // 基本的优化配置
  5. set hive.exec.dynamic.partition.mode=nonstrict;
  6. set hive.optimize.sort.dynamic.partition=true;
  7. // 关闭map端优化
  8. set hive.auto.convert.join=false;

2.users表

  1. // hive 外部表映射HBase
  2. drop table if exists hb_users;
  3. create external table hb_users
  4. (
  5. userid string,
  6. birthyear int,
  7. gender string,
  8. locale string,
  9. location string,
  10. timezone string,
  11. joinedat string
  12. ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES
  13. ('hbase.columns.mapping' =
  14. ':key,profile:birthyear,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt')
  15. tblproperties ('hbase.table.name' = 'events_db:users');
  16. // 创建一个内部表,存放格式为orc格式
  17. create table users stored as orc as select * from hb_users;
  18. drop table if exists hb_users;
  19. select * from users;

3.events表

  1. drop table if exists hb_events;
  2. create external table hb_events
  3. (
  4. eventid string,
  5. userid string,
  6. starttime string,
  7. city string,
  8. state string,
  9. zip string,
  10. country string,
  11. lat float,
  12. lng float,
  13. commonwords string
  14. ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES
  15. ('hbase.columns.mapping' =
  16. ':key,creator:userid,schedule:starttime,location:city,location:state,location:zip,location:country,location:lat,location:lng,remark:commonwords')
  17. tblproperties ('hbase.table.name' = 'events_db:events');
  18. set mapreduce.framework.name=local;
  19. set hive.exec.mode.local.auto=true;
  20. create table if not exists events stored as orc as select * from hb_events;
  21. select * from events;
  22. drop table if exists hb_events;

4.user_friend表

  1. drop table if exists hb_user_friend;
  2. create external table if not exists hb_user_friend
  3. (
  4. rowkey string,
  5. userid string,
  6. friendid string
  7. ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES
  8. ('hbase.columns.mapping' =
  9. ':key,uf:user_id,uf:friend_id')
  10. tblproperties ('hbase.table.name' = 'events_db:user_friend');
  11. create table if not exists user_friend stored as orc as select * from hb_user_friend;
  12. select count(*) from user_friend;// 30279525
  13. select * from hb_user_friend;
  14. drop table if exists hb_user_friend;

5.event_attendee表

  1. drop table if exists hb_event_attendee;
  2. create external table hb_event_attendee
  3. (
  4. rowkey string,
  5. eventid string,
  6. friendid string,
  7. attendtype string
  8. ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES
  9. ('hbase.columns.mapping' =
  10. ':key,euat:eventid,euat:friendid,euat:state')
  11. tblproperties ('hbase.table.name' = 'events_db:event_attendee');
  12. create table event_attendee stored as orc as select * from hb_event_attendee;
  13. select * from event_attendee;
  14. drop table if exists hb_event_attendee;

6.train表

  1. drop table if exists hb_train;
  2. create external table if not exists hb_train
  3. (
  4. rowkey string,
  5. userid string,
  6. eventid string,
  7. invited string,
  8. `timestamp` string,
  9. interested string
  10. ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES
  11. ('hbase.columns.mapping' =
  12. ':key,eu:user,eu:event,eu:invited,eu:timestamp,eu:interested')
  13. tblproperties ('hbase.table.name' = 'events_db:train');
  14. create table train stored as orc as select * from hb_train;
  15. select count(*) from train;// 15398
  16. drop table if exists hb_train;
  17. select * from train;

7.locale表

  1. create external table locale(
  2. locale_id int,
  3. locale string
  4. )row format delimited fields terminated by '\t'
  5. location '/events/ods/data/locale';
  6. select * from locale;

8.time_zone表

  1. create external table time_zone(
  2. time_zone_id int,
  3. time_zone string
  4. )row format delimited fields terminated by ','
  5. location '/events/ods/data/timezone';
  6. select * from time_zone;

9.dwd_events中的表

(二)DWS层——Hive实现

注意:事务表必须用orc格式。

1.dws_events库

  1. -- 用户兴趣取向分析
  2. -- 用户userid 事件eventid 用户是否某一事件感兴趣/不感兴趣
  3. set hive.exec.dynamic.partition.mode=nonstrict;
  4. set hive.optimize.sort.dynamic.partition=true;
  5. set hive.auto.convert.join=false;
  6. create database if not exists dws_events;
  7. use dws_events;

2.user_friend_count表——每个用户的朋友的数量

  1. -- 计算每一个用户朋友的数量是多少
  2. -- user_friend 得到每一个用户朋友的数量
  3. create table user_friend_count stored as orc as
  4. select userid,
  5. count(friendid) friendcount
  6. from dwd_events.user_friend
  7. where friendid is not null
  8. and trim(friendid) != ''
  9. group by userid;
  10. select *
  11. from user_friend_count
  12. limit 10;

3.event_attendee_count表——每个事件发生后的出席情况

  1. create table event_attendee_count stored as orc as
  2. select eventid,
  3. attendtype,
  4. count(friendid) attendCount
  5. from dwd_events.event_attendee
  6. group by eventid, attendtype;
  7. select *
  8. from event_attendee_count
  9. limit 10;

接下来,要改变表的形式:

4.event_user_state表——每个事件邀请的朋友的应邀情况

  1. create table if not exists dws_events.event_user_state
  2. stored as orc as
  3. select eventid,
  4. friendid,
  5. case when attendtype = 'invitedID' then 1 else 0 end as invited,
  6. case when attendtype = 'yes' then 1 else 0 end as yes,
  7. case when attendtype = 'noID' then 1 else 0 end as no,
  8. case when attendtype = 'maybeID' then 1 else 0 end as maybe
  9. from dwd_events.event_attendee;

5.user_event_status表——event_user_state表的数据梳理

  1. create table if not exists user_event_status stored as orc as
  2. select t1.friendid attend_userid,
  3. t1.eventid,
  4. max(t1.invited) invited,
  5. max(t1.yes) attended,
  6. max(t1.no) not_attended,
  7. max(t1.maybe) maybe_attended
  8. from event_user_state t1
  9. group by t1.eventid, t1.friendid;

6.user_attend_event_count表——查看每个用户分别对事件的邀请参加、不参加的数量的统计

  1. create table if not exists user_attend_event_count stored as orc as
  2. select attend_userid,
  3. sum(invited) as invited_count,
  4. sum(attended) as attended_count,
  5. sum(not_attended) as not_attended_count,
  6. sum(maybe_attended) as maybe_attended_count
  7. from user_event_status
  8. group by attend_userid;

7.friend_attend_state表——统计应邀情况

dwd_events.user_friend表:

dws_events.user_event_status表:

上述两表关联,统计应邀情况

  1. create table if not exists friend_attend_state stored as orc as
  2. select uf.userid,
  3. uf.friendid,
  4. ues.eventid,
  5. case when ues.invited > 0 then 1 else 0 end as invited,
  6. case when ues.attended > 0 then 1 else 0 end as attended,
  7. case when ues.not_attended > 0 then 1 else 0 end as not_attended,
  8. case when ues.maybe_attended > 0 then 1 else 0 end as maybe_attended
  9. from dwd_events.user_friend uf
  10. left join dws_events.user_event_status ues on ues.attend_userid = uf.friendid;

8.friend_attend_summary表——求用户朋友中针对某一件事情的状态数量 多少人被邀请,多少人参加,多少人不参加,多少人可能去参加

  1. create table if not exists friend_attend_summary stored as orc as
  2. select userid,
  3. eventid,
  4. sum(invited) invited_friends_count,
  5. sum(attended) attended_friends_count,
  6. sum(not_attended) not_attended_friends_count,
  7. sum(maybe_attended) maybe_attended_friends_count
  8. from friend_attend_state
  9. where eventid is not null
  10. group by userid, eventid;

9.event_cities表——找出事件发生最多的前32个的城市

  1. create table if not exists event_cities stored as orc as
  2. select case when t.city <> '' then t.city else 'nocity' end city,
  3. -- if(t.city <> '', t.city, 'nocity'),
  4. t.count,
  5. row_number() over (order by t.count desc ) as level
  6. from (
  7. select city,
  8. count(*) count
  9. from dwd_events.events
  10. -- where city is not null and city != ''
  11. group by city
  12. order by count desc
  13. limit 32) t;

10.event_countries表——找出事件发生最多的前8个国家

  1. create table if not exists event_countries stored as orc as
  2. select case when t.country <> '' then t.country else 'nocountry' end as country,
  3. t.count,
  4. row_number() over (order by t.count desc ) level
  5. from (
  6. select country,
  7. count(*) count
  8. from dwd_events.events
  9. group by country
  10. order by count desc
  11. limit 8) t;

11.dws层的表

(三)DWS层——spark实现

1.环境搭建

  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  3. object SparkToHive {
  4. def main(args: Array[String]): Unit = {
  5. // spark读取hive表
  6. val spark: SparkSession = SparkSession.builder().appName("sparkhive")
  7. .master("local[*]")
  8. .config("hive.metastore.uris", "thrift://192.168.180.147:9083")
  9. .enableHiveSupport()
  10. .getOrCreate()
  11. import spark.implicits._
  12. spark.close()
  13. }
  14. }

2.user_friend_count表——每个用户的朋友的数量

  1. val user_friend: DataFrame = spark.table("dwd_events.user_friend")
  2. user_friend.show(10, false)
  3. /*
  4. +------+----------+----------+
  5. |rowkey|userid |friendid |
  6. +------+----------+----------+
  7. | � |3939178181|826810668 |
  8. | � |927508653 |268007813 |
  9. | R |3341263967|1057491214|
  10. | � |2756012832|1792996666|
  11. | |894415506 |2419486976|
  12. | b |1354439342|655720229 |
  13. | � |1049125233|2716161422|
  14. | � |3385957102|2696714993|
  15. | � |1486659549|1461981334|
  16. | � |4289947035|2269712784|
  17. +------+----------+----------+ */
  18. val user_friend_count: DataFrame = user_friend
  19. .where(col("friendid").isNotNull and trim($"friendid") =!= "")
  20. .groupBy("userid")
  21. .agg(count("friendid").as("friendcount"))
  22. .select($"userid", $"friendcount")
  23. user_friend_count.show(10)
  24. /*
  25. +----------+-----------+
  26. | userid|friendcount|
  27. +----------+-----------+
  28. |1890034372| 4211|
  29. |1477282294| 1057|
  30. | 59413733| 1341|
  31. |2490509127| 1874|
  32. |2264585544| 4655|
  33. |3490623651| 2841|
  34. |3711871763| 814|
  35. |3292982761| 1855|
  36. |1072430432| 2047|
  37. |1194867611| 2526|
  38. +----------+-----------+*/
  39. // 检验是否正确
  40. user_friend_count.filter("userid==2490509127").show()
  41. // todo 将user_friend_count表存入hive
  42. user_friend_count.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.user_friend_count")
  43. println("user_friend_count存入成功!")

3.event_attendee_count表———每个用户的朋友的数量

  1. val event_attendeeDF: DataFrame = spark.table("dwd_events.event_attendee")
  2. event_attendeeDF.show(10, false)
  3. val event_attendee_count: DataFrame = event_attendeeDF
  4. .groupBy("eventid", "attendtype")
  5. .agg(count("friendid").as("attendCount"))
  6. .select($"eventid", $"attendtype", $"attendCount")
  7. event_attendee_count.show(10)
  8. /*
  9. +----------+----------+-----------+
  10. | eventid|attendtype|attendCount|
  11. +----------+----------+-----------+
  12. |2889060532| yes| 142|
  13. |2892429018| noID| 24|
  14. |2897294049| yes| 32|
  15. |2905904014| invitedID| 744|
  16. |2907906052| yes| 19|
  17. | 291670090| yes| 16|
  18. |2920583393| maybeID| 9|
  19. | 292137502| invitedID| 22|
  20. |2923173603| noID| 1|
  21. | 292829502| yes| 26|
  22. +----------+----------+-----------+*/
  23. // 检验是否正确
  24. // event_attendee_count.filter("eventid==100022787").show()
  25. // todo 将event_attendee_count表存入hive
  26. event_attendee_count.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.event_attendee_count")
  27. println("event_attendee_count存入成功!")

4.event_user_state表——每个事件邀请的朋友的应邀情况

  1. val event_attendee: DataFrame = spark.table("dwd_events.event_attendee")
  2. event_attendee.show(10, false)
  3. /*
  4. +-----------------------------+----------+----------+----------+
  5. |rowkey |eventid |friendid |attendtype|
  6. +-----------------------------+----------+----------+----------+
  7. |2876154686398769537invitedID |2876154686|398769537 |invitedID |
  8. |28761546864009644924yes |2876154686|4009644924|yes |
  9. |28761546864013573384invitedID|2876154686|4013573384|invitedID |
  10. |28761546864024311006invitedID|2876154686|4024311006|invitedID |
  11. |28761546864036236137noID |2876154686|4036236137|noID |
  12. |28761546864041306043noID |2876154686|4041306043|noID |
  13. |28761546864076390199invitedID|2876154686|4076390199|invitedID |
  14. |28761546864094501247invitedID|2876154686|4094501247|invitedID |
  15. |28761546864103909109yes |2876154686|4103909109|yes |
  16. |28761546864114930709noID |2876154686|4114930709|noID |
  17. +-----------------------------+----------+----------+----------+*/
  18. val event_user_state: DataFrame = event_attendee
  19. .select("eventid", "friendid", "attendtype")
  20. .withColumn("invited", when(col("attendtype") === "invitedID", 1).otherwise(0))
  21. .withColumn("yes", when(col("attendtype") === "yes", 1).otherwise(0))
  22. .withColumn("no", when(col("attendtype") === "noID", 1).otherwise(0))
  23. .withColumn("maybe", when(col("attendtype") === "maybeID", 1).otherwise(0))
  24. .drop("attendtype")
  25. event_user_state.show(10, false)
  26. /*
  27. +----------+----------+-------+---+---+-----+
  28. |eventid |friendid |invited|yes|no |maybe|
  29. +----------+----------+-------+---+---+-----+
  30. |2876154686|398769537 |1 |0 |0 |0 |
  31. |2876154686|4009644924|0 |1 |0 |0 |
  32. |2876154686|4013573384|1 |0 |0 |0 |
  33. |2876154686|4024311006|1 |0 |0 |0 |
  34. |2876154686|4036236137|0 |0 |1 |0 |
  35. |2876154686|4041306043|0 |0 |1 |0 |
  36. |2876154686|4076390199|1 |0 |0 |0 |
  37. |2876154686|4094501247|1 |0 |0 |0 |
  38. |2876154686|4103909109|0 |1 |0 |0 |
  39. |2876154686|4114930709|0 |0 |1 |0 |
  40. +----------+----------+-------+---+---+-----+*/
  41. // todo 将event_user_state表存入hive
  42. event_user_state.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.event_user_state")
  43. println("event_user_state存入成功!")

5.user_event_status表——event_user_state表的数据梳理

  1. val user_event_status: DataFrame = event_user_state
  2. .groupBy("eventid", "friendid")
  3. .agg(max("invited").as("invited"),
  4. max("yes").as("attended"),
  5. max("no").as("not_attended"),
  6. max("maybe").as("maybe_attended"))
  7. .select($"friendid".as("attend_userid"), $"eventid", $"invited", $"attended", $"not_attended", $"maybe_attended")
  8. user_event_status.show(10, false)
  9. /*
  10. +-------------+----------+-------+--------+------------+--------------+
  11. |attend_userid|eventid |invited|attended|not_attended|maybe_attended|
  12. +-------------+----------+-------+--------+------------+--------------+
  13. |4121945394 |2876154686|0 |0 |0 |1 |
  14. |141447478 |2876312334|1 |0 |0 |0 |
  15. |1540745136 |2876312334|1 |0 |0 |0 |
  16. |1602785576 |2876312334|1 |0 |0 |0 |
  17. |1954378660 |2876312334|1 |0 |0 |0 |
  18. |2467890010 |2876312334|1 |0 |0 |0 |
  19. |3854249513 |2876312334|1 |0 |0 |0 |
  20. |3900944627 |2876312334|1 |0 |0 |0 |
  21. |99137438 |2876312334|1 |0 |0 |0 |
  22. |2653177815 |2876474895|1 |0 |0 |0 |
  23. +-------------+----------+-------+--------+------------+--------------+*/
  24. // todo 将user_event_status表存入hive
  25. user_event_status.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.user_event_status")
  26. println("user_event_status存入成功!")

6.user_attend_event_count表——查看每个用户分别对事件的邀请参加、不参加的数量的统计

  1. val user_attend_event_count: DataFrame = user_event_status.
  2. groupBy("attend_userid")
  3. .agg(
  4. sum("invited").as("invited_count"),
  5. sum("attended").as("attended_count"),
  6. sum("not_attended").as("not_attended_count"),
  7. sum("maybe_attended").as("maybe_attended_count"))
  8. .select($"attend_userid", $"invited_count", $"attended_count", $"not_attended_count", $"maybe_attended_count")
  9. user_attend_event_count.show(10, false)
  10. // user_attend_event_count.filter(col("invited_count").isNull).show()
  11. /*
  12. +-------------+-------------+--------------+------------------+--------------------+
  13. |attend_userid|invited_count|attended_count|not_attended_count|maybe_attended_count|
  14. +-------------+-------------+--------------+------------------+--------------------+
  15. |855128455 |6 |0 |0 |0 |
  16. |1298918693 |70 |13 |0 |1 |
  17. |973498510 |3 |0 |0 |0 |
  18. |3733456205 |1 |0 |0 |0 |
  19. |3924756713 |0 |0 |0 |1 |
  20. |2049233271 |1 |0 |0 |0 |
  21. |1544040576 |572 |4 |1 |0 |
  22. |198353704 |5 |0 |0 |0 |
  23. |1042939212 |53 |2 |1 |0 |
  24. |989774136 |15 |0 |1 |0 |
  25. +-------------+-------------+--------------+------------------+--------------------+*/
  26. // todo 将user_attend_event_count表存入hive
  27. user_attend_event_count.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.user_attend_event_count")
  28. println("user_attend_event_count存入成功!")

7.friend_attend_state表——统计应邀情况

  1. // user_friend.show(10,false)
  2. // user_event_status.show(10,false)
  3. // 通过friendid=attend_userid相关联
  4. val friend_attend_state: DataFrame = user_friend.as("uf")
  5. .join(
  6. user_event_status.as("ues"),
  7. $"uf.friendid" === $"ues.attend_userid",
  8. "left")
  9. .select(
  10. $"uf.userid",
  11. $"uf.friendid",
  12. $"ues.eventid",
  13. when(col("ues.invited") > 0, 1).otherwise(0).as("invited"),
  14. when(col("ues.attended") > 0, 1).otherwise(0).as("attended"),
  15. when(col("ues.not_attended") > 0, 1).otherwise(0).as("not_attended"),
  16. when(col("ues.maybe_attended") > 0, 1).otherwise(0).as("maybe_attended")
  17. )
  18. friend_attend_state.show(10, false)
  19. /*
  20. +----------+----------+----------+-------+--------+------------+--------------+
  21. |userid |friendid |eventid |invited|attended|not_attended|maybe_attended|
  22. +----------+----------+----------+-------+--------+------------+--------------+
  23. |700005400 |1000000082|null |0 |0 |0 |0 |
  24. |3182595870|1000061907|2025801575|0 |0 |1 |0 |
  25. |3182595870|1000061907|2662605961|0 |0 |1 |0 |
  26. |105163661 |1000159243|null |0 |0 |0 |0 |
  27. |3241009765|1000174727|null |0 |0 |0 |0 |
  28. |4146824251|1000174727|null |0 |0 |0 |0 |
  29. |2041077011|1000174727|null |0 |0 |0 |0 |
  30. |713793505 |1000174727|null |0 |0 |0 |0 |
  31. |4152340748|1000177128|null |0 |0 |0 |0 |
  32. |3189616067|1000225963|null |0 |0 |0 |0 |
  33. +----------+----------+----------+-------+--------+------------+--------------+*/
  34. // todo 将friend_attend_state表存入hive
  35. friend_attend_state.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.friend_attend_state")
  36. println("friend_attend_state存入成功!")

8.friend_attend_summary表——求用户朋友中针对某一件事情的状态数量 多少人被邀请,多少人参加,多少人不参加,多少人可能去参加

  1. val friend_attend_summary: DataFrame = friend_attend_state
  2. .where(col("eventid").isNotNull)
  3. .groupBy("userid", "eventid")
  4. .agg(
  5. sum("invited").as("invited_friends_count"),
  6. sum("attended").as("attended_friends_count"),
  7. sum("not_attended").as("not_attended_friends_count"),
  8. sum("maybe_attended").as("maybe_attended_friends_count")
  9. )
  10. .select($"userid", $"eventid", $"invited_friends_count", $"attended_friends_count",
  11. $"not_attended_friends_count", $"maybe_attended_friends_count")
  12. friend_attend_summary.show(10, false)
  13. /*
  14. +----------+----------+---------------------+----------------------+--------------------------+----------------------------+
  15. |userid |eventid |invited_friends_count|attended_friends_count|not_attended_friends_count|maybe_attended_friends_count|
  16. +----------+----------+---------------------+----------------------+--------------------------+----------------------------+
  17. |139333642 |2153037761|1 |0 |0 |0 |
  18. |3965867052|2693701979|542 |9 |19 |15 |
  19. |570405433 |844053363 |1 |0 |0 |0 |
  20. |3600799019|3480624055|63 |13 |5 |11 |
  21. |220900628 |1684651848|24 |0 |0 |0 |
  22. |4076593100|1902753965|73 |0 |0 |0 |
  23. |3098511794|843844488 |27 |0 |0 |1 |
  24. |212275010 |1900273727|1 |0 |0 |0 |
  25. |484625739 |623506969 |136 |0 |2 |0 |
  26. |2678093681|1486124986|136 |0 |0 |0 |
  27. +----------+----------+---------------------+----------------------+--------------------------+----------------------------+*/
  28. // todo 将friend_attend_summary表存入hive
  29. friend_attend_summary.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.friend_attend_summary")
  30. println("friend_attend_summary存入成功!")

9.event_cities表——找出事件发生最多的前32个的城市

  1. val events: DataFrame = spark.table("dwd_events.events")
  2. events.show(10, false)
  3. /*
  4. +----------+----------+------------------------+---------+-------+-----+--------------+------+-------+-----------+
  5. |eventid |userid |starttime |city |state |zip |country |lat |lng |commonwords|
  6. +----------+----------+------------------------+---------+-------+-----+--------------+------+-------+-----------+
  7. |2926425690|596548160 |2012-06-09T05:00:00.000Z|Charlotte|NC | |United States |35.238|-80.819|8 |
  8. |2926430949|2924318367|2012-10-18T19:00:00.003Z| | | | |null |null |1 |
  9. |2926432319|3488255249|2012-11-11T17:30:00.003Z| | | | |null |null |0 |
  10. |2926434242|4104060347|2012-11-15T05:00:00.003Z| | | | |null |null |1 |
  11. |2926434816|458628699 |2012-12-24T03:00:00.003Z|Vernon |BC | |Canada |50.262|-119.27|8 |
  12. |2926434921|4161455361|2012-11-14T00:00:00.003Z| | | | |null |null |1 |
  13. |292643510 |4236892345|2012-11-21T19:00:00.002Z|London |England| |United Kingdom|null |null |0 |
  14. |2926435519|2028324284|2012-12-02T02:30:00.003Z| | | | |null |null |1 |
  15. |2926438764|1025231184|2012-09-23T00:00:00.003Z| | | | |null |null |13 |
  16. |2926439286|3578613806|2012-11-09T00:00:00.001Z|Omaha |NE |68102|United States |41.257|-95.936|2 |
  17. +----------+----------+------------------------+---------+-------+-----+--------------+------+-------+-----------+*/
  18. val event_cities: DataFrame = events
  19. .groupBy("city")
  20. .agg(count("*").as("count"))
  21. .orderBy(col("count").desc)
  22. .limit(32)
  23. .withColumn("city_1", when($"city" =!= "", $"city").otherwise("nocity"))
  24. .drop("city")
  25. .withColumnRenamed("city_1", "city")
  26. .selectExpr("city", "count", "row_number() over(order by count desc) as level")
  27. event_cities.show(32)
  28. /*
  29. +-------------+-------+-----+
  30. | city| count|level|
  31. +-------------+-------+-----+
  32. | nocity|1557124| 1|
  33. | New York| 43009| 2|
  34. | Toronto| 32023| 3|
  35. | Los Angeles| 27831| 4|
  36. | Chicago| 21390| 5|
  37. | London| 21187| 6|
  38. |San Francisco| 20302| 7|
  39. | Brooklyn| 14689| 8|
  40. | Austin| 14528| 9|
  41. | Houston| 12599| 10|
  42. | Philadelphia| 12589| 11|
  43. | Seattle| 11952| 12|
  44. | Portland| 11672| 13|
  45. | Washington| 11638| 14|
  46. | San Diego| 11438| 15|
  47. | Vancouver| 10445| 16|
  48. | Atlanta| 10012| 17|
  49. | Las Vegas| 8964| 18|
  50. | Denver| 8771| 19|
  51. | Dallas| 8590| 20|
  52. | Miami| 8252| 21|
  53. | Phoenix| 7335| 22|
  54. | Boston| 6810| 23|
  55. | Minneapolis| 6737| 24|
  56. | Montreal| 6273| 25|
  57. | Orlando| 6272| 26|
  58. | San Antonio| 6025| 27|
  59. | Ottawa| 5921| 28|
  60. | Columbus| 5605| 29|
  61. | Saint Louis| 5391| 30|
  62. | Melbourne| 5280| 31|
  63. | Sacramento| 5231| 32|
  64. +-------------+-------+-----+*/
  65. // todo 将event_cities表存入hive
  66. event_cities.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.event_cities")
  67. println("event_cities存入成功!")

10.event_countries表——找出事件发生最多的前8个国家

  1. val event_countries: DataFrame = events
  2. .groupBy("country")
  3. .agg(count("*").as("count"))
  4. .orderBy(col("count").desc)
  5. .limit(8)
  6. .withColumn("country_1", when($"country" =!= "", $"country").otherwise("nocountries"))
  7. .drop("country")
  8. .withColumnRenamed("country_1", "country")
  9. .selectExpr("country", "count", "row_number() over(order by count desc) as level")
  10. event_countries.show()
  11. /*
  12. +--------------+-------+-----+
  13. | country| count|level|
  14. +--------------+-------+-----+
  15. | nocountries|1533009| 1|
  16. | United States|1068337| 2|
  17. | Canada| 137768| 3|
  18. |United Kingdom| 67806| 4|
  19. | Italy| 48436| 5|
  20. | Australia| 33137| 6|
  21. | Germany| 18176| 7|
  22. | Mexico| 17730| 8|
  23. +--------------+-------+-----+*/
  24. // todo 将event_countries表存入hive
  25. event_countries.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("spark_hive.event_countries")
  26. println("event_countries存入成功!")

11.验证是否存入成功

(四)DM层——宽表明细层

1.dm_events库

  1. set hive.exec.dynamic.partition.mode=nonstrict;
  2. set hive.optimize.sort.dynamic.partition=true;
  3. set hive.auto.convert.join=false;
  4. set hive.exec.mode.local.auto=true;
  5. create database if not exists dm_events;
  6. use dm_events;

2.user_event_1表——每个事件发生的国家、城市等信息

  1. create table user_event_1 stored as orc as
  2. select t.userid,
  3. t.eventid,
  4. t.invited user_invited,
  5. t.interested,
  6. e.eventid event_creator,
  7. if(e.city <> '', e.city, 'nocity') event_city,
  8. e.state event_state,
  9. `if`(e.country <> '', e.country, 'nocountry') event_country,
  10. e.lat,
  11. e.lng
  12. from dwd_events.train t
  13. inner join dwd_events.events e
  14. on t.eventid = e.eventid;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/726741
推荐阅读
相关标签
  

闽ICP备14008679号