当前位置:   article > 正文

pyspark英国零售电商用户画像(RFM)_基于spark的rfm模型

基于spark的rfm模型

1 环境搭建

本次作业使用的环境和软件如下:

(1)Linux操作系统:Ubuntu 16.04
(2)Python:3.5.2
(3)Hadoop:3.1.3
(4)Spark:2.4.0

具体的安装步骤参考林子雨的教程基于零售交易数据的Spark数据处理与分析

2 数据处理

数据处理及分析参考链接在这里

数据清洗的操作主要有:

  1. 数据缺失值处理:删除CustomerID为空值的135037条数据
  2. 数据去重:删除数据集中重复的、无价值的数据5268条
  3. 时间序列的处理:将InvoiceDate转换为时间序列InvoiceTime
  4. 异常值处理:删除Quantity数量和UnitPrice单价中小于等于0的值

处理好的数据还余392692条,保存为sales_success.csv文件。

然后将数据上传到hdfs:(假设你的数据保存在/home/hadoop/下载/中)

./bin/hdfs dfs -put /home/hadoop/下载/sales_success.csv
  • 1

3 用户画像

本节需要安装jupyter,参考教程也是林子雨老师的使用Jupyter Notebook调试PySpark程序

用户画像参考链接点击

3.1 读取数据

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, concat, lit, substring
from pyspark.sql.functions import col, max as max_
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import rank,sum,to_date, datediff, max, count,to_utc_timestamp, unix_timestamp,expr,mean
import pyspark.sql.functions as F
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
spark = SparkSession \
    .builder \
    .appName("pySpark RFM") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
  • 1
  • 2
  • 3
  • 4
  • 5
df = spark.read.format('com.databricks.spark.csv').\
                       options(header='true', \
                       inferschema='true').\
            load("sales_success.csv",header=True);
  • 1
  • 2
  • 3
  • 4

查看数据前5行:

df.show(5)
  • 1

在这里插入图片描述

3.2 RFM用户画像

3.2.1 时间序列

df = df.withColumn("InvoiceTime", to_timestamp(col("InvoiceTime"), 'yyyy-MM-dd HH:mm:ss'))
  • 1
df.show(5)
  • 1

在这里插入图片描述

3.2.2 计算R、F、M值

  • 最近一次的消费时间(Recency):
    一般考虑最近一次消费到某固定时间点的间隔时长,这个维度需要结合业务的特点加以考虑。基于历史数据的分析,大概率上满足,消费频率越高的用户粘性更大,也更可能成为忠实用户。此时,如何预防用户出现厌倦流失等状况,如何稳定住这批客户是需要关注的问题。而对于那些早期有过消费但很快流失的客户,则需要从更多的数据中尝试挖掘出共性。

  • 一段时间内的消费频次(Frequency):
    一段时间内,对于购买消费频次更高的用户,其忠诚度更高,在很大概率上也更容易产生下一次消费行为。基于历史交易数据分析用户的消费频次数据,一般情况下也是满足周期性或者比较稳定的规律;如果出现了比较异常的数据,可能更需要在业务角度提供推荐、优惠、回访等方式进行关注。

  • 一段时间内的累计消费额(Monetary):
    一般是累积某时间段内用户消费的总金额,比如一年内,一个月内的消费总额;一般的,消费越多的用户有更大概率去购买更用心和更高级的产品;这类用户可能在消费观上更愿意接受新产品,此时通过建立合适的 VIP 机制或者类似的用户回馈机制,可以提高用户的体验。

  • 计算时间间距

rfm = df\
.groupBy("CustomerID")\
.agg(max_("InvoiceTime").alias('LastDate'))
  • 1
  • 2
  • 3
max_date = df.agg({"InvoiceTime": "max"}).collect()[0][0]
rfm = rfm.withColumn("RecentDate", lit(max_date))
  • 1
  • 2
  • R值
rfm = rfm.withColumn("recency", datediff(col("RecentDate"), col("LastDate")))
rfm = rfm.select("CustomerID", "recency")
  • 1
  • 2
  • M值
monetary = df.groupBy("CustomerID").agg(sum("SumPrice").alias('monetary'))
  • 1
  • F值
freq = df.groupBy("CustomerID").agg(count("StockCode").alias('frequency'))
  • 1
  • 合并起来
rfm = rfm.join(monetary, ["CustomerID"]).join(freq, ["CustomerID"])

rfm.show(5)
  • 1
  • 2
  • 3

在这里插入图片描述

3.2.3 建立用户评分机制

按照传统的 RFM 客户细分,根据用户消费的频繁程度和消费支出的金额,实现价值划分。为用户分组,主要将 RFM 三个指标分别分为“高”和“低”两种。

  • 先计算出各个指标的25%、50%、75%;
cols = ['Recency','Frequency','Monetary']
rfm.summary().show()
  • 1
  • 2

在这里插入图片描述

  • 根据各个指标中值进行赋值,小于25%赋值为1,小于50%赋值为2,小于75%赋值为3,大于75%赋值为4。
def RScore(x):
    if  x <= 17:
        return 1
    elif x<= 50:
        return 2
    elif x<= 143:
        return 3
    else:
        return 4

def FScore(x):
    if  x <= 1:
        return 4
    elif x <= 41:
        return 3
    elif x <= 98:
        return 2
    else:
        return 1

def MScore(x):
    if  x <= 306:
        return 4
    elif x <= 669:
        return 3
    elif x <= 1661:
        return 2
    else:
        return 1

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = udf(lambda x: RScore(x), StringType())
F_udf = udf(lambda x: FScore(x), StringType())
M_udf = udf(lambda x: MScore(x), StringType())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 将各个指标的权值保存为新的特征r_seg、f_seg、m_seg
rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))
rfm_seg.show(5)
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

  • 对各个指标的权值进行整合
rfm_seg = rfm_seg.withColumn('RFMScore',
                             F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg')))

rfm_seg = rfm_seg.withColumn('r_seg', col('r_seg').cast('int'))
rfm_seg = rfm_seg.withColumn('f_seg', col('f_seg').cast('int'))
rfm_seg = rfm_seg.withColumn('m_seg', col('m_seg').cast('int'))

rfm_seg = rfm_seg.withColumn('score', col('r_seg') + col('f_seg') + col('m_seg'))

rfm_seg.show(5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述

  • 计算各个权值的平均值,作为指标
rfm_seg.summary().show()
  • 1

在这里插入图片描述

  • 各个指标中高于该指标标准的标签设为“高”, 低于该指标标准的标签设为“低”;
def RS(x):
    if x > 2.49:
        return '高'
    else:
        return '低'

def FS(x):
    if x > 2.27:
        return '高'
    else:
        return '低'

def MS(x):
    if x > 2.5:
        return '高'
    else:
        return '低'

RSc = udf(lambda x: RS(x), StringType())
FSc = udf(lambda x: FS(x), StringType())
MSc = udf(lambda x: MS(x), StringType())

rfm_seg = rfm_seg.withColumn("R", RSc("r_seg"))
rfm_seg = rfm_seg.withColumn("F", FSc("f_seg"))
rfm_seg = rfm_seg.withColumn("M", MSc("m_seg"))
rfm_seg.show(5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

在这里插入图片描述

  • 根据结果组合“高”“低”标签进行用户价值细分,比如各标签都是“高”的则划分为高价值客户。
# 将三个指标结合
rfm_seg = rfm_seg.withColumn('RFM', concat("R", "F", "M"))
rfm_seg.show(5)
  • 1
  • 2
  • 3

在这里插入图片描述

def rfm2grade(x):
    if x=='高高高':
        return '高价值客户'
    elif x=='高低高':
        return '重点发展客户'
    elif x=='低高高':
        return '重点保持客户'
    elif x=='低低高':
        return '重点挽留客户'
    elif x=='高高低':
        return '一般价值客户'
    elif x=='高低低':
        return '一般发展客户'
    elif x=='低高低':
        return '一般保持客户'
    else:
        return '一般挽留客户'  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
score_udf = udf(lambda x: rfm2grade(x), StringType())
rfm_seg = rfm_seg.withColumn("General_Segment", score_udf("RFM"))

rfm_seg.show(5)
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

3.2.3 RFM用户画像结果

stats = rfm_seg.groupby('General_Segment').agg(
        mean('Recency').alias('Mean_Recency'),
        mean('Frequency').alias('Mean_Frequency'),
        mean('Monetary').alias('Mean_Monetary'),
        count('*').alias('Count')
    )
stats.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

from pyspark.sql import SparkSession
from pyecharts import options as opts
from pyecharts.charts import Pie

segment_counts = stats.groupBy("General_Segment").count().collect()

# 将结果转换为两个列表:segments和counts
segments = [row.General_Segment for row in segment_counts]
counts = stats.select('Count').rdd.flatMap(lambda x: x).collect()

# 创建Pie对象
pie = (
    Pie()
    .add("", [list(z) for z in zip(segments, counts)])  # 添加数据
    .set_colors(['#FF4500', '#FFA500', '#FFD700', '#ADFF2F', '#1E90FF'])  # 设置颜色
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c} ({d}%)"))  # 设置数据标签格式
)

# 渲染图表并保存到HTML文件
pie.render("general_segment_pie_chart.html")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在这里插入图片描述

3.3 KMeans用户画像

  • 也可以使用KMeans聚类来做用户划分标准

3.3.1

  • 建立特征矩阵
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

def transData(data):
    return data.rdd.map(lambda r: [r[0],Vectors.dense(r[1:])]).toDF(['CustomerID','rfm'])

transformed= transData(rfm)
transformed.show(5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述

  • 标准化
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="rfm",\
         outputCol="features")
scalerModel =  scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)
scaledData.show(5,False)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

3.3.2 K值

  • 手肘法
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce  # For Python 3.x

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
import numpy as np
cost = np.zeros(20)
for k in range(2,20):
    kmeans = KMeans()\
            .setK(k)\
            .setSeed(1) \
            .setFeaturesCol("features")\
            .setPredictionCol("cluster")

    model = kmeans.fit(scaledData)
    cost[k] = model.computeCost(scaledData)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator

fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20], marker = "o")
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里插入图片描述

  • 轮廓系数法
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

silhouette_score=[]
 
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
 
for i in range(2,10):
    KMeans_algo=KMeans(featuresCol='features', k=i)
    KMeans_fit=KMeans_algo.fit(scaledData)
    output=KMeans_fit.transform(scaledData)
    score=evaluator.evaluate(output)
    silhouette_score.append(score)
    print("Silhouette Score:",score)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
# 可视化轮廓分数
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),silhouette_score)
ax.set_xlabel('k')
ax.set_ylabel('cost')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述
通过手肘法和轮廓系数法发现,比较合适的k值在5-8之间,在这里我们选取k=6作为最佳值。K值的选取并没有固定的标准,在这里我们选取k=6的原因是能够取得较大的轮廓系数。

3.3.3 用户画像

k = 6
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(scaledData)
# Make predictions
predictions = model.transform(scaledData)
predictions.show(6,False)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

stat = predictions.groupby('prediction').agg(
        count('*').alias('Count')
    )
stat.show()
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

from pyspark.sql import SparkSession
from pyecharts import options as opts
from pyecharts.charts import Pie

segment_counts = stat.groupBy("prediction").count().collect()

# 将结果转换为两个列表:segments和counts
segments = [row.prediction for row in segment_counts]
counts = stats.select('Count').rdd.flatMap(lambda x: x).collect()

# 创建Pie对象
pie = (
    Pie()
    .add("", [list(z) for z in zip(segments, counts)])  # 添加数据
    .set_colors(['#FF4500', '#FFA500', '#FFD700', '#ADFF2F', '#1E90FF'])  # 设置颜色
    .set_global_opts(title_opts=opts.TitleOpts(title="prediction Distribution"))  # 设置标题
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c} ({d}%)"))  # 设置数据标签格式
)

# 渲染图表并保存到HTML文件
pie.render("prediction_pie_chart.html")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

在这里插入图片描述
到这里我们就做完啦!你真棒!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/681890
推荐阅读
相关标签
  

闽ICP备14008679号