赞
踩
本次作业使用的环境和软件如下:
(1)Linux操作系统:Ubuntu 16.04
(2)Python:3.5.2
(3)Hadoop:3.1.3
(4)Spark:2.4.0
具体的安装步骤参考林子雨的教程基于零售交易数据的Spark数据处理与分析
数据处理及分析参考链接在这里
数据清洗的操作主要有:
处理好的数据还余392692条,保存为sales_success.csv文件。
然后将数据上传到hdfs:(假设你的数据保存在/home/hadoop/下载/中)
./bin/hdfs dfs -put /home/hadoop/下载/sales_success.csv
本节需要安装jupyter,参考教程也是林子雨老师的使用Jupyter Notebook调试PySpark程序
用户画像参考链接点击
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, concat, lit, substring
from pyspark.sql.functions import col, max as max_
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import rank,sum,to_date, datediff, max, count,to_utc_timestamp, unix_timestamp,expr,mean
import pyspark.sql.functions as F
spark = SparkSession \
.builder \
.appName("pySpark RFM") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("sales_success.csv",header=True);
查看数据前5行:
df.show(5)
df = df.withColumn("InvoiceTime", to_timestamp(col("InvoiceTime"), 'yyyy-MM-dd HH:mm:ss'))
df.show(5)
最近一次的消费时间(Recency):
一般考虑最近一次消费到某固定时间点的间隔时长,这个维度需要结合业务的特点加以考虑。基于历史数据的分析,大概率上满足,消费频率越高的用户粘性更大,也更可能成为忠实用户。此时,如何预防用户出现厌倦流失等状况,如何稳定住这批客户是需要关注的问题。而对于那些早期有过消费但很快流失的客户,则需要从更多的数据中尝试挖掘出共性。
一段时间内的消费频次(Frequency):
一段时间内,对于购买消费频次更高的用户,其忠诚度更高,在很大概率上也更容易产生下一次消费行为。基于历史交易数据分析用户的消费频次数据,一般情况下也是满足周期性或者比较稳定的规律;如果出现了比较异常的数据,可能更需要在业务角度提供推荐、优惠、回访等方式进行关注。
一段时间内的累计消费额(Monetary):
一般是累积某时间段内用户消费的总金额,比如一年内,一个月内的消费总额;一般的,消费越多的用户有更大概率去购买更用心和更高级的产品;这类用户可能在消费观上更愿意接受新产品,此时通过建立合适的 VIP 机制或者类似的用户回馈机制,可以提高用户的体验。
计算时间间距
rfm = df\
.groupBy("CustomerID")\
.agg(max_("InvoiceTime").alias('LastDate'))
max_date = df.agg({"InvoiceTime": "max"}).collect()[0][0]
rfm = rfm.withColumn("RecentDate", lit(max_date))
rfm = rfm.withColumn("recency", datediff(col("RecentDate"), col("LastDate")))
rfm = rfm.select("CustomerID", "recency")
monetary = df.groupBy("CustomerID").agg(sum("SumPrice").alias('monetary'))
freq = df.groupBy("CustomerID").agg(count("StockCode").alias('frequency'))
rfm = rfm.join(monetary, ["CustomerID"]).join(freq, ["CustomerID"])
rfm.show(5)
按照传统的 RFM 客户细分,根据用户消费的频繁程度和消费支出的金额,实现价值划分。为用户分组,主要将 RFM 三个指标分别分为“高”和“低”两种。
cols = ['Recency','Frequency','Monetary']
rfm.summary().show()
def RScore(x): if x <= 17: return 1 elif x<= 50: return 2 elif x<= 143: return 3 else: return 4 def FScore(x): if x <= 1: return 4 elif x <= 41: return 3 elif x <= 98: return 2 else: return 1 def MScore(x): if x <= 306: return 4 elif x <= 669: return 3 elif x <= 1661: return 2 else: return 1 from pyspark.sql.functions import udf from pyspark.sql.types import StringType, DoubleType R_udf = udf(lambda x: RScore(x), StringType()) F_udf = udf(lambda x: FScore(x), StringType()) M_udf = udf(lambda x: MScore(x), StringType())
rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))
rfm_seg.show(5)
rfm_seg = rfm_seg.withColumn('RFMScore',
F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg')))
rfm_seg = rfm_seg.withColumn('r_seg', col('r_seg').cast('int'))
rfm_seg = rfm_seg.withColumn('f_seg', col('f_seg').cast('int'))
rfm_seg = rfm_seg.withColumn('m_seg', col('m_seg').cast('int'))
rfm_seg = rfm_seg.withColumn('score', col('r_seg') + col('f_seg') + col('m_seg'))
rfm_seg.show(5)
rfm_seg.summary().show()
def RS(x): if x > 2.49: return '高' else: return '低' def FS(x): if x > 2.27: return '高' else: return '低' def MS(x): if x > 2.5: return '高' else: return '低' RSc = udf(lambda x: RS(x), StringType()) FSc = udf(lambda x: FS(x), StringType()) MSc = udf(lambda x: MS(x), StringType()) rfm_seg = rfm_seg.withColumn("R", RSc("r_seg")) rfm_seg = rfm_seg.withColumn("F", FSc("f_seg")) rfm_seg = rfm_seg.withColumn("M", MSc("m_seg")) rfm_seg.show(5)
# 将三个指标结合
rfm_seg = rfm_seg.withColumn('RFM', concat("R", "F", "M"))
rfm_seg.show(5)
def rfm2grade(x): if x=='高高高': return '高价值客户' elif x=='高低高': return '重点发展客户' elif x=='低高高': return '重点保持客户' elif x=='低低高': return '重点挽留客户' elif x=='高高低': return '一般价值客户' elif x=='高低低': return '一般发展客户' elif x=='低高低': return '一般保持客户' else: return '一般挽留客户'
score_udf = udf(lambda x: rfm2grade(x), StringType())
rfm_seg = rfm_seg.withColumn("General_Segment", score_udf("RFM"))
rfm_seg.show(5)
stats = rfm_seg.groupby('General_Segment').agg(
mean('Recency').alias('Mean_Recency'),
mean('Frequency').alias('Mean_Frequency'),
mean('Monetary').alias('Mean_Monetary'),
count('*').alias('Count')
)
stats.show()
from pyspark.sql import SparkSession from pyecharts import options as opts from pyecharts.charts import Pie segment_counts = stats.groupBy("General_Segment").count().collect() # 将结果转换为两个列表:segments和counts segments = [row.General_Segment for row in segment_counts] counts = stats.select('Count').rdd.flatMap(lambda x: x).collect() # 创建Pie对象 pie = ( Pie() .add("", [list(z) for z in zip(segments, counts)]) # 添加数据 .set_colors(['#FF4500', '#FFA500', '#FFD700', '#ADFF2F', '#1E90FF']) # 设置颜色 .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c} ({d}%)")) # 设置数据标签格式 ) # 渲染图表并保存到HTML文件 pie.render("general_segment_pie_chart.html")
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
def transData(data):
return data.rdd.map(lambda r: [r[0],Vectors.dense(r[1:])]).toDF(['CustomerID','rfm'])
transformed= transData(rfm)
transformed.show(5)
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="rfm",\
outputCol="features")
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)
scaledData.show(5,False)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
cost = np.zeros(20)
for k in range(2,20):
kmeans = KMeans()\
.setK(k)\
.setSeed(1) \
.setFeaturesCol("features")\
.setPredictionCol("cluster")
model = kmeans.fit(scaledData)
cost[k] = model.computeCost(scaledData)
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20], marker = "o")
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features', \
metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,10):
KMeans_algo=KMeans(featuresCol='features', k=i)
KMeans_fit=KMeans_algo.fit(scaledData)
output=KMeans_fit.transform(scaledData)
score=evaluator.evaluate(output)
silhouette_score.append(score)
print("Silhouette Score:",score)
# 可视化轮廓分数
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),silhouette_score)
ax.set_xlabel('k')
ax.set_ylabel('cost')
通过手肘法和轮廓系数法发现,比较合适的k值在5-8之间,在这里我们选取k=6作为最佳值。K值的选取并没有固定的标准,在这里我们选取k=6的原因是能够取得较大的轮廓系数。
k = 6
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(scaledData)
# Make predictions
predictions = model.transform(scaledData)
predictions.show(6,False)
stat = predictions.groupby('prediction').agg(
count('*').alias('Count')
)
stat.show()
from pyspark.sql import SparkSession from pyecharts import options as opts from pyecharts.charts import Pie segment_counts = stat.groupBy("prediction").count().collect() # 将结果转换为两个列表:segments和counts segments = [row.prediction for row in segment_counts] counts = stats.select('Count').rdd.flatMap(lambda x: x).collect() # 创建Pie对象 pie = ( Pie() .add("", [list(z) for z in zip(segments, counts)]) # 添加数据 .set_colors(['#FF4500', '#FFA500', '#FFD700', '#ADFF2F', '#1E90FF']) # 设置颜色 .set_global_opts(title_opts=opts.TitleOpts(title="prediction Distribution")) # 设置标题 .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c} ({d}%)")) # 设置数据标签格式 ) # 渲染图表并保存到HTML文件 pie.render("prediction_pie_chart.html")
到这里我们就做完啦!你真棒!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。