赞
踩
大数据专业综合项目实践,数据集采用阿里天池的公开数据集,下载链接: 消费者商品交易调研清单
这个数据集是一个样本集,共有5000多条记录,每条记录代表一个消费者的商品交易调研信息。以下是对每个字段的描述:
消费者姓名:消费者的姓名。
年龄:消费者的年龄。
性别:消费者的性别。
月薪:消费者的月薪状况。
消费偏好:消费者在购买商品时的偏好类型,如性价比、功能性、时尚潮流、环保可持续等。
消费领域:消费者购买的商品领域,如家居用品、汽车配件、珠宝首饰、美妆护肤等。
购物平台:消费者常用的购物平台,如天猫、苏宁易购、淘宝、拼多多等。
支付方式:消费者在购物时使用的支付方式,如微信支付、货到付款、支付宝、信用卡等。
单次购买商品数量:消费者每次购买商品的数量。
优惠券获取情况:消费者在购物过程中是否获取到优惠券,如折扣优惠、免费赠品等。
购物动机:消费者购物的动机,如品牌忠诚、日常使用、礼物赠送、商品推荐等。
通过对数据集的分析及可视化,可以了解消费者的购物偏好、消费习惯和购物动机,从而为企业制定营销策略和产品定位提供参考。
Linux Ubuntu 16.04
jdk-7u75-linux-x64
eclipse-java-juno-SR2-linux-gtk-x86_64
Flume 1.5.0 -cdh5.4.5
Sqoop 1.4.5-cdh5.4.5
Hive-common-1.1.0-cdh5.4.5
Spark 1.6.0 Scala 2.10.5 kafka 0.8.2
Mysql Ver 14.14 Distrib 5.7.24 for Linux(x86_64)
打开终端,创建目录,新建文件
mkdir /data/shiyan1
gedit /data/shiyan1/shujuji
将下载内容去掉标题行并写入到shujuji文件中(或者后续在mapreduce程序中进行此步骤也行,但这里是先去掉第一行内容)
hadoop fs -mkdir /shiyan1/origindata/
hadoop fs -put /data/shiyan1/shujuji /shiyan1/origindata/*
先创建一个清洗后文件保留的目录
hadoop fs -mkdir /shiyan1/cleandata/
在eclipse中新建一个项目,再新建一个目录folder,命名为lib,导入项目所需jar包。具体操作:选中所有jar包(按住Shift快捷键),右键点击Add to Build Path。(项目jar包在我的博客主页资源里,需要自取)
新建一个类名为Clean,写入下述代码
package my.clean; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Clean { public static class doMapper extends Mapper<Object, Text, Text, Text> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] arr = value.toString().split(","); StringBuilder one = new StringBuilder(); one.append(arr[1]); one.append("\t"); one.append(arr[2]); one.append("\t"); one.append(arr[3]); one.append("\t"); one.append(arr[4]); one.append("\t"); one.append(arr[5]); one.append("\t"); one.append(arr[9]); one.append("\t"); one.append(arr[10]); context.write(new Text(one.toString()), new Text("")); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJobName("Clean"); job.setJarByClass(Clean.class); job.setMapperClass(doMapper.class); // job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path in = new Path("hdfs://localhost:9000//shiyan1/origindata"); Path out = new Path("hdfs://localhost:9000//shiyan1/cleandata"); FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
右键项目,点击run on hadoop(注意要先开启hadoop)
打开终端,输入命令,查看结果,再将清洗后的数据下载到本地
hadoop fs -cat /shiyan1/cleandata/part-r-00000 >> /data/shiyan1/cleandata
create database behavior;
use behavior;
create table xiaofei (age int,sex string,salary int,consumelike string,consumearea string, coupon string,shoppeupose string) row format delimited fields terminated by '\t' ;
load data local inpath '/data/shiyan1/cleandata' into table xiaofei;
在hive中,执行查询操作,验证数据是否导入成功。若没有数据,看看是否漏掉哪条命令
select * from xiaofei limit 10;
需求1:
统计消费者商品购物调研单中,中老年人较大(35岁做划分)与年轻人购物人数与比例:
select age,count(*) as num from(
select case when age>=35 then 1 when age<35 then 0 end as age from xiaofei
) t
group by age
(发现比例大致为为1:2)
需求2:统计不同年龄的消费偏好和消费动机,挖掘出现最多的次数,发现不同年龄的人群消费追求是什么
select age,consumelike,shoppurpose,nums from (
select * ,row_number()over(partition by age order by nums desc) as rank from (
select age,consumelike,shoppurpose,nums from (
select age,consumelike,shoppurpose ,count(*) as nums from xiaofei
group by age,consumelike,shoppurpose
) t
where nums>=2
) p
) m
where rank = 1
(统计结果发现,有些年龄段消费追求较集中,而有些年龄段消费追求比较广泛,消费追求差别也较大)
需求3:统计不同性别,对购物时用优惠券的关注情况(各取前三位)
select sex ,coupon from (
select *, row_number()over(partition by sex order by num desc) as rank from (
select sex,coupon,count(*) as num from xiaofei
group by sex,coupon
) as t
) as p
where rank <=3
(统计结果:不管男性还是女性,免费赠品对他们的诱惑还是很大的,除此之外,女性购物喜欢用优惠券,而男性则是有购买欲望就买了,对优惠券的使用力度不是很大)
这里新建了三个表,用于保存上面查询的结果
create table agecount(age int,num int) row format delimited fields terminated by '\t' ;
create table agelike(age int,consumelike string,consumearea string,num int) row format delimited fields terminated by '\t' ;
create table sexcoupon(sex string,counpon string) row format delimited fields terminated by '\t' ;
再在查询语句前添加insert into table ,以第一个需求为例,后面类推
insert into table agecount select age,count(*) as num from(
select case when age>=35 then 1 when age<35 then 0 end as age from xiaofei
) t
group by age
sudo service mysql start
这里输入你的用户名和密码
mysql -u root -p
创建库,并在库下创建表
CREATE DATABASE IF NOT EXISTS behavior DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
use behavior;
create table agecount (age int ,num int);
create table agecount (age int ,consumelike varchar(200),consumearea varchar(200),num int);
create table sexcoupon(sex varchar(20),shoppurpose varchar(200));
sqoop export --connect jdbc:mysql://localhost:3306/behavior?characterEncoding=UTF-8 --username root --password strongs --table agecount --export-dir /user/hive/warehouse/behavior.db/agecount/000000_0 --input-fields-terminated-by '\t'
sqoop export --connect jdbc:mysql://localhost:3306/behavior?characterEncoding=UTF-8 --username root --password strongs --table agelike --export-dir /user/hive/warehouse/behavior.db/agelike/000000_0 --input-fields-terminated-by '\t'
sqoop export --connect jdbc:mysql://localhost:3306/behavior?characterEncoding=UTF-8 --username root --password strongs --table sexcoupon --export-dir /user/hive/warehouse/behavior.db/sexcoupon/000000_0 --input-fields-terminated-by '\t'
select * from sexcoupon;
mkdir /data/shiyan1/realtime/datasource
mkdir /data/shiyan1/realtime/datarandom
mkdir /data/shiyan1/realtime/shellrealtime
首先进入到编辑模式,如果你发现不识别gedit命令,可以尝试使用vim或者vi
gedit /data/shiyan1/realtime/shellrealtime/time.sh
写入下述内容
#!/bin/bash
file_count=1
while true;do
for i in {1..5} ; do
if read -r line; then
echo "$line" >> /data/shiyan1/realtime/datarandom/file_${file_count}.txt
else
break 2
fi
done
((file_count++))
sleep 10
done < /data/shiyan1/realtime/datasource/source
下面配置flume的conf文件,测试flume是否可以正常工作
gedit spooldir_mem_logger.conf
将以下Flume的配置信息添加到文件里,然后保存退出。使其实现功能为监控/data/shiyan1/realtime/datarandom目录,并将读取到的文件输出到console界面。
agent1.sources=src
agent1.channels=ch
agent1.sinks=des
agent1.sources.src.type = spooldir
agent1.sources.src.restart = true
agent1.sources.src.spoolDir =/data/shiyan1/realtime/datarandom
agent1.channels.ch.type=memory
agent1.sinks.des.type = logger
agent1.sources.src.channels=ch
agent1.sinks.des.channel=ch
配置好spooldir_mem_logger.conf文件后,切换到Flume安装目录下并启动Flume。(说明下:这里/data/edu6/是我配置flume的conf文件的目录)
cd /apps/flume
flume-ng agent -c /data/edu6/ -f /data/edu6/spooldir_mem_logger.conf -n agent1
-Dflume.root.logger=DEBUG,console
运行上面写好的time.sh脚本程序,发现目录下文件有如下变化。
若检测成功,可以发现文件结尾加了.COMPLETED后缀
接着请删除/data/shiyan1/realtime/datarandom目录下的所有文件,防止影响flume内容监测。
前置步骤:开启hadoop服务,zookeeper服务,接着进入kafka安装目录,启动Kafka-server端。
cd /apps/kafka
bin/kafka-server-start.sh config/server.properties
创建topic,命名为flumesendkafka。
bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--topic flumesendkafka \
--partitions 1
查看当前kafka中,都有哪些topic
/apps/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
cd /apps/kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flumesendkafka
这里的localhost为Kafka的IP,9092为broker节点的端口。用户可以在console界面上,输入信息,交给producer进行处理,并发给consumer。
cd /apps/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flumesendkafka
--from-beginning
kafka-console-consumer.sh依然需要加一些参数,比如ZooKeeper的IP及端口、主题名称、读取数据位置等。
在执行kafka-console-producer.sh命令的界面中,随便输入几行文字,按回车。可以看到在consumer端,会将同样的内容,输出出来
新建Scala Project,(如需项目所用jar包,请去我的博客资源里自行下载)
统计每一段时间,共有多少条新数据,做一个实时计算
package my.streaming import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.dstream.InputDStream import java.sql.DriverManager import java.sql.ResultSet import java.sql.Connection import java.sql.PreparedStatement import java.text.SimpleDateFormat import java.util.Date object JianKong { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("jiankong").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(4)) ssc.checkpoint("checkpoint") val topics = Set("flumesendkafka") val brokers = "localhost:9092" val zkQuorum = "localhost:2181" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) lines.foreach(line => { var strs = line.collect() println(strs.size) var finalNum = 0 for (str: String <- strs) { /**Use Fastjson to parse jsonString!*/ println("finalNum : " + finalNum + "#"+ str) if (!str.equals("")) { finalNum = finalNum + 5 } } println("finalNum: " + finalNum) var now: Date = new Date() val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") var creationtime = dateFormat.format(now) val db_host = "localhost" val db_name = "realtimebase" val db_user = "root" val db_passwd = "strongs" val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into jiankong (creationtime, num) values (?, ?)" try { conn = DriverManager.getConnection(db_connection_str) ps = conn.prepareStatement(sql) ps.setString(1, creationtime) ps.setInt(2, finalNum) ps.executeUpdate() } catch { case e: Exception => println("MySQL Exception") } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } }) ssc.start() ssc.awaitTermination() ssc.stop() } }
sudo service mysql start
mysql -u root -p
CREATE DATABASE IF NOT EXISTS realtimebase DEFAULT CHARSET utf8 \
COLLATE utf8_general_ci;
use realtimebase
create table jiankong (creationtime datetime,num int);
这样,通过sparkstreaming处理过的程序,就会将结果写入到Mysql数据库中,最后查看每一段时间共有多少条评论被记录
cd /apps/kafka
bin/kafka-server-start.sh config/server.properties
cd /apps/flume
flume-ng agent -c /data/edu6/ -f /data/edu6/spooldir_mem_logger.conf -n agent1
-Dflume.root.logger=DEBUG,console
/data/shiyan1/realtime/shellrealtime/time.sh
发现Mysql表中有相应内容,实时处理结束。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。