赞
踩
Spark记录未整理,请以较平静的心态阅读。
更新已整理 2024-04-16
本文的目的就是要实现一个根据用户看过的anime_id做近似查找 (ANN, Approximate Nearest Neightbor),将相似的anime_id推荐给该用户。
为了做到这件事,我们要实现几个小模块
1)根据是anime_id构建一个邻接矩阵
2)构建转移矩阵和初始化矩阵(用于深度游走)
3)根据deepwalk算法实现固定长度的sample
4)使用spark的word2vec训练稠密向量
5)使用redis缓存user_id的embedding参数
6)使用局部敏感hash算法完成近似查找ANN
原始数据集
https://www.kaggle.com/datasets/CooperUnion/anime-recommendations-database
包引入
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import numpy as np
from collections import defaultdict
spark 运行需要jdk,导入环境变量JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/jre-1.8.0-openjdk"
row和col是anime_id,值是看过行anime_id又看过列anime_id的user_id数
邻接矩阵格式如下(示例)
anime_id | 20 | 81 | 170 | 263 |
---|---|---|---|---|
20 | 0 | 1 | 2 | 2 |
81 | 1 | 0 | … | … |
170 | 2 | … | 0 | … |
263 | 2 | … | … | 0 |
spark = SparkSession.builder.appName("feat-eng").getOrCreate() rating_df = spark.read.csv( "/data/jupyter/recommendation_data/rating.csv", header=True, inferSchema=True ) rating_df = rating_df.where("rating > 7") watch_seq_df = rating_df.groupBy("user_id").agg( F.collect_list(F.col("anime_id").cast("string")).alias("anime_ids") ) # print(watch_seq_df.show(2)) # +-------+--------------------+ # |user_id| anime_ids| # +-------+--------------------+ # | 148|[20, 81, 170, 263...| # | 463|[20, 24, 68, 102,...| # +-------+--------------------+ # print(watch_seq_df.printSchema()) # root # |-- user_id: integer (nullable = true) # |-- anime_ids: array (nullable = true) # | |-- element: string (containsNull = false) # 转成一个list watch_seq = watch_seq_df.collect() # watch_seq格式 [[20, 81, 170, 263,...],...] watch_seq = [s["anime_ids"] for s in watch_seq] # 邻接矩阵 matrix = defaultdict(lambda: defaultdict(int)) for i in range(len(watch_seq)): seq = watch_seq[i] for x in range(len(seq)): for y in range(x + 1, len(seq)): a = seq[x] b = seq[y] if a == b: continue # 对角线不统计 matrix[a][b] += 1 matrix[b][a] += 1
matrix["20"]
打印格式如下:
matrix={anime_id:{anime_id:int}}
{"20":{"81":1,"170":2,...}}
表示同时看过20和81的人数为1; 同时看过20和170的人数为2;具体如下:
print(matrix["20"])
概率转移矩阵:决定当前节点在deepwalk中如何选择下一个节点
入口转移矩阵:deepwalk 初始化时如何选择的节点
概率转移矩阵格式:某一个anime_id的neighbours和他的转移概率probs
{
"anime_id": {
"neighbours": [2, 3, 5, 7],
"probs": [0.16, 0.16, 0.32, 0.32]
},
}
入口转移矩阵定义:每一个anime_id和邻居节点观看总人数除以所有anime_id和邻居节点观看总人数
[0.001953672356421993, 0.0004123166720890604, 0.0008729517041885576, ...]
def get_transfer_prob(vs): # vs是dict neighbours = list(vs.keys()) total_weight = sum(vs.values()) probs = [vs[k] / total_weight for k in vs.keys()] return {"neighbours": neighbours, "prob": probs} tranfer_probs = {k: get_transfer_prob(v) for k, v in matrix.items()} entrance_items = list(tranfer_probs) # 将字典的key转成list # anime_id=k的所有的邻居节点的总和 neighbour_sum = {k: sum(matrix[k].values()) for k in entrance_items} # neighbour_sum={'20': 1213, # '81': 256, # '170': 542, # '263': 649, # ... # } total_sum = sum(neighbour_sum.values()) # 入口转移矩阵 entrence_probs = [neighbour_sum[i] / total_sum for i in entrance_items] # entrence_probs=[0.001953672356421993, # 0.0004123166720890604, # 0.0008729517041885576, # 0.001045287188225782, # 0.0008278545681788165, # ... # ]
流程:
1)在入口转移概率entrance_items中随机选一个节点,加入路径中
2)循环length次在转移概率tranfer_probs中随机选一个节点,加入路径中
3)更新当前节点
4)重复循环形成一个length+1的一个节点路径
4)外部循环n次形成n组采样
import numpy as np rng = np.random.default_rng() # tranfer_probs= # { # "1": { # "neighbours": [2, 3, 5, 7], # "prob": [0.16, 0.16, 0.32, 0.32] # } # }, def one_walk(length, entrance_items, entrence_probs, tranfer_probs): """ length (int): 采样长度 entrance_items (list): anime_id entrence_probs (list): entrance_itemsd对应的概率 tranfer_probs (_type_): 每个anime_id的转移概率矩阵 """ # 根据这个概率entrence_probs从entrance_items中随机选择一个 start_point = rng.choice(entrance_items, 1, p=entrence_probs)[0] path = [str(start_point)] current_point = start_point for _ in range(length): neighbours = tranfer_probs[current_point]["neighbours"] #current_point选一个邻居节点 transfor_prob = tranfer_probs[current_point]["prob"] # 根据这个概率选择 next_point = rng.choice(neighbours, 1, p=transfor_prob)[0] path.append(str(next_point)) current_point = next_point return path n = 500 # 考虑计算量只生成500个数据,每个数据的长度为21 sample = [one_walk(20, entrance_items, entrence_probs, tranfer_probs) for _ in range(n)]
sample采样结果如下,数字代表anime_id:
将sample采样后数据转化成DataFrame格式,利用spark的word2vec转化为稠密向量。
from pyspark.ml.feature import Word2Vec # 转化为 DataFrame sample_df = spark.createDataFrame([[row] for row in sample], ["anime_ids"]) item2vec = Word2Vec(vectorSize=5, maxIter=2, windowSize=15) # skip model item2vec.setInputCol("anime_ids") item2vec.setOutputCol("anime_ids_vec") model = item2vec.fit(sample_df) # 这里是找到和anime_id=20比较近的10条数据 # rec = model.findSynonyms("20", 10) # 获取训练参数,便于进行下一步操作 item_vec = model.getVectors().collect() item_emb = {} for item in item_vec: # print(item.word) # 9936 # print(item.vector.toArray()) # [-0.01321484 0.18788637 0.01939221 0.15455356 0.00537518] # break item_emb[item.word] = item.vector.toArray() # 测试获取结果 # item_emb["20"] # # array([ 0.00960128, -0.1283727 , -0.01474599, -0.1391744 , -0.14709859])
build_user_emb函数根据前面将某一个用户看过的所有anime_id逐个进行embedding,之后平均处理。这样最终我们就可以根据某一个用户看过的所有anime_id获得某一个用户的embedding。
@F.udf(returnType="array<float>") def build_user_emb(anime_seq): anime_embs = [item_emb[aid] if aid in item_emb else [] for aid in anime_seq] anime_embs = list(filter(lambda l: len(l) > 0, anime_embs)) emb = np.mean(anime_embs, axis=0) return emb.tolist() user_emb_df = watch_seq_df.withColumn("user_emb", build_user_emb(F.col("anime_ids"))) print(user_emb_df.show(3)) # +-------+--------------------+--------------------+ # |user_id| anime_ids| user_emb| # +-------+--------------------+--------------------+ # | 148|[20, 81, 170, 263...|[0.22693123, -0.0...| # | 463|[20, 24, 68, 102,...|[0.24494155, -0.0...| # | 471| [1604, 6702, 10681]|[0.37150145, 0.04...| # +-------+--------------------+--------------------+
使用docker在本地器起一个redis服务
from redis import Redis redis = Redis() user_emb = user_emb_df.collect() # user_id和其对应的embedding user_emb = {row.user_id: row.user_emb for row in user_emb} # 辅助函数将float类别转化为字符串用:分割 def vec2str(vec): if vec is None: return "" return ":".join([str(v) for v in vec]) def save_user_emb(embs): str_emb = {item_id: vec2str(v) for item_id, v in embs.items()} redis.hset("recall-user-emb", mapping=str_emb) # 用户的embedding保存到redis save_user_emb(user_emb) # test语句 # redis.hget("recall-user-emb", "148") # # b'0.140301913022995:-0.06533453613519669:-0.10318902134895325:0.07486995309591293:0.022806329652667046'
读取redis中数据
# 将str读成float的list
def str2vec(s):
if len(s) == 0:
return None
return [float(x) for x in s.split(":")]
def load_user_emb():
result = redis.hgetall("recall-user-emb")
return {user_id.decode(): str2vec(emb.decode()) for user_id, emb in result.items()}
# 读取所有数据
load_user_emb()
使用包faiss的局部敏感hash函数(LSH)
pip install faiss-cpu # 加载包
官网 https://github.com/facebookresearch/faiss
import faiss import numpy as np emb_items = item_emb.items() # emb_items 格式 # dict_items([('9936', array([-0.02695967, -0.14685549, -0.13547155, -0.00479672, -0.00417542])), #('710', array([ 0.03320758, -0.11462902, 0.17806329, -0.3202453 , 0.12857111])),...]) emb_items = list(__builtin__.filter(lambda t: len(t[1]) > 0, emb_items)) # user_id item_ids = [i[0] for i in emb_items] # user_id对应的embedding embs = [i[1] for i in emb_items] index = faiss.IndexLSH(len(embs[0]), 256) index.add(np.asarray(embs, dtype=np.float32)) # 查找embs[99]=[-0.03878592 0.15011692 0.01134511 0.03049661 0.11688153]最接近的10个原始向量 # D 是距离(faiss内部定义) I 是原始向量的index D, I = index.search(np.asanyarray([embs[99]], dtype=np.float32), 10) print(D) # [[ 0. 8. 23. 25. 29. 32. 33. 33. 39. 39.]] print(I) # [[ 99 690 540 347 754 788 170 186 206 612]]
最后我们通过user观看所有anime的一个平均词嵌入embs[99],然后根据这个词嵌入寻找到10个(包含本身)与之相似的anime_id的词嵌入,从而可以获得这9个anime推荐给该用户。
similar_anime_id = [item_ids[i] for i in I[0][1:]]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。