赞
踩
在上一篇中我们简单介绍了常用的Graph Embedding算法,今天来对其中较为常用的两种算法——DeepWalk和Node2Vec进行python代码实现。
Karate Club 是一个 Python 算法包,专门用于图形分析和图形挖掘。它提供了一系列经典和先进的图形聚类和图形嵌入算法,旨在帮助研究人员和数据科学家处理和分析各种类型的图形数据。该算法包的名字取自 Zachary’s Karate Club,这是一个关于社交网络的著名案例,用于研究社区结构和社交网络动态性。Karate Club 旨在为用户提供一种简单而灵活的方式来探索和分析图形数据,同时提供了一些高效的算法和工具,以解决图形分析中的常见问题。
import networkx as nx
# 创建或加载一个图
G = nx.karate_club_graph()
Karate Club实现
from karateclub import DeepWalk
from karateclub import Estimator
# 初始化DeepWalk模型
deepwalk = DeepWalk(walk_number=10, walk_length=80, dimensions=64, epochs=10)
# 训练模型
deepwalk.fit(G)
# 获取嵌入
embedding = deepwalk.get_embedding()
模型参数
from gensim.models.word2vec import Word2Vec import random from functools import partial from typing import List, Callable import numpy as np # 随机游走 class RandomWalker: """ Class to do fast first-order random walks. Args: walk_length (int): Number of random walks. walk_number (int): Number of nodes in truncated walk. """ def __init__(self, walk_length: int, walk_number: int): self.walk_length = walk_length self.walk_number = walk_number def do_walk(self, node: int) -> List[str]: """ Doing a single truncated random walk from a source node. Arg types: * **node** *(int)* - The source node of the random walk. Return types: * **walk** *(list of strings)* - A single truncated random walk. """ walk: List[int] = [node] for _ in range(self.walk_length - 1): nebs = [node for node in self.graph.neighbors(walk[-1])] if len(nebs) > 0: walk = walk + random.sample(nebs, 1) walk = [str(w) for w in walk] return walk def do_walks(self, graph): """ Doing a fixed number of truncated random walk from every node in the graph. Arg types: * **graph** *(NetworkX graph)* - The graph to run the random walks on. """ self.walks: List[List[str]] = [] self.graph = graph for node in self.graph.nodes(): for _ in range(self.walk_number): walk_from_node = self.do_walk(node) self.walks.append(walk_from_node) # DeepWalk class CustomDeepWalk(Estimator): def __init__( self, walk_number: int = 10, walk_length: int = 80, dimensions: int = 128, workers: int = 4, window_size: int = 5, epochs: int = 1, use_hierarchical_softmax: bool = True, number_of_negative_samples: int = 5, learning_rate: float = 0.05, min_count: int = 1, seed: int = 42, ): self.walk_number = walk_number self.walk_length = walk_length self.dimensions = dimensions self.workers = workers self.window_size = window_size self.epochs = epochs self.use_hierarchical_softmax = use_hierarchical_softmax self.number_of_negative_samples = number_of_negative_samples self.learning_rate = learning_rate self.min_count = min_count self.seed = seed def fit(self, graph: nx.classes.graph.Graph): """ Fitting a DeepWalk model. Arg types: * **graph** *(NetworkX graph)* - The graph to be embedded. """ self._set_seed() graph = self._check_graph(graph) # 随机游走生成序列 walker = RandomWalker(self.walk_length, self.walk_number) walker.do_walks(graph) # Word2Vec model = Word2Vec( walker.walks, hs=1 if self.use_hierarchical_softmax else 0, negative=self.number_of_negative_samples, alpha=self.learning_rate, epochs=self.epochs, vector_size=self.dimensions, window=self.window_size, min_count=self.min_count, workers=self.workers, seed=self.seed, ) num_of_nodes = graph.number_of_nodes() self._embedding = [model.wv[str(n)] for n in range(num_of_nodes)] def get_embedding(self) -> np.array: r"""Getting the node embedding. Return types: * **embedding** *(Numpy array)* - The embedding of nodes. """ return np.array(self._embedding) # 初始化自定义DeepWalk模型 customdeepwalk = CustomDeepWalk(walk_number=10, walk_length=80, dimensions=64, epochs=10) # 训练模型 customdeepwalk.fit(G) # 获取嵌入 embedding = customdeepwalk.get_embedding()
from karateclub import Node2Vec
# 初始化 Node2Vec 模型
model = Node2Vec()
# 训练 Node2Vec 模型
model.fit(G)
# 获取节点的嵌入向量
embeddings = model.get_embedding()
模型参数
def _undirected(node, graph) -> List[tuple]: # 对于无向图,返回所有的边 edges = graph.edges(node) return edges def _directed(node, graph) -> List[tuple]: # 对于有向图,只返回出边 edges = graph.out_edges(node, data=True) return edges def _get_edge_fn(graph) -> Callable: # 判断图的类型,采取不同的获取边的方案 fn = _directed if nx.classes.function.is_directed(graph) else _undirected fn = partial(fn, graph=graph) return fn def _unweighted(edges: List[tuple]) -> np.ndarray: # 没有权重都设置为1 return np.ones(len(edges)) def _weighted(edges: List[tuple]) -> np.ndarray: # 有权重用自身权重 weights = map(lambda edge: edge[-1]["weight"], edges) return np.array([*weights]) def _get_weight_fn(graph) -> Callable: # 判断图形是否存在权重 fn = _weighted if nx.classes.function.is_weighted(graph) else _unweighted return fn # 扩展的随机游走过程(引入p、q) class BiasedRandomWalker: """ Args: walk_length (int): Number of random walks. walk_number (int): Number of nodes in truncated walk. p (float): Return parameter (1/p transition probability) to move towards previous node. q (float): In-out parameter (1/q transition probability) to move away from previous node. """ walks: list graph: nx.classes.graph.Graph # 表示可调用对象 edge_fn: Callable weight_fn: Callable def __init__(self, walk_length: int, walk_number: int, p: float, q: float): self.walk_length = walk_length self.walk_number = walk_number self.p = p self.q = q def do_walk(self, node: int) -> List[str]: """ 从源节点执行单个截断的二阶随机遍历 Arg types: * **node** *(int)* - The source node of the random walk. Return types: * **walk** *(list of strings)* - A single truncated random walk. """ walk = [node] previous_node = None previous_node_neighbors = [] for _ in range(self.walk_length - 1): current_node = walk[-1] edges = self.edge_fn(current_node) # 获取当前节点的邻居节点 current_node_neighbors = np.array([edge[1] for edge in edges]) # 获取到邻居节点的边的权重 weights = self.weight_fn(edges) # 根据权重生成概率 probability = np.piecewise( weights, [ current_node_neighbors == previous_node, # 判断当前节点的邻居节点是否在前一个节点的邻居节点中 np.isin(current_node_neighbors, previous_node_neighbors), ], # 返回前一个节点的概率 [lambda w: w / self.p, lambda w: w / 1, lambda w: w / self.q], ) # 标准化概率值 norm_probability = probability / sum(probability) # 从 current_node_neighbors中以norm_probability概率抽取一个值 selected = np.random.choice(current_node_neighbors, 1, p=norm_probability)[ 0 ] walk.append(selected) previous_node_neighbors = current_node_neighbors previous_node = current_node walk = [str(w) for w in walk] return walk def do_walks(self, graph) -> None: """ Doing a fixed number of truncated random walk from every node in the graph. Arg types: * **graph** *(NetworkX graph)* - The graph to run the random walks on. """ self.walks = [] self.graph = graph self.edge_fn = _get_edge_fn(graph) self.weight_fn = _get_weight_fn(graph) for node in self.graph.nodes(): for _ in range(self.walk_number): walk_from_node = self.do_walk(node) self.walks.append(walk_from_node) # 扩展的随机游走过程(引入p、q) class BiasedRandomWalker: """ Args: walk_length (int): Number of random walks. walk_number (int): Number of nodes in truncated walk. p (float): Return parameter (1/p transition probability) to move towards previous node. q (float): In-out parameter (1/q transition probability) to move away from previous node. """ walks: list graph: nx.classes.graph.Graph # 表示可调用对象 edge_fn: Callable weight_fn: Callable def __init__(self, walk_length: int, walk_number: int, p: float, q: float): self.walk_length = walk_length self.walk_number = walk_number self.p = p self.q = q def do_walk(self, node: int) -> List[str]: """ 从源节点执行单个截断的二阶随机遍历 Arg types: * **node** *(int)* - The source node of the random walk. Return types: * **walk** *(list of strings)* - A single truncated random walk. """ walk = [node] previous_node = None previous_node_neighbors = [] for _ in range(self.walk_length - 1): current_node = walk[-1] edges = self.edge_fn(current_node) # 获取当前节点的邻居节点 current_node_neighbors = np.array([edge[1] for edge in edges]) # 获取到邻居节点的边的权重 weights = self.weight_fn(edges) # 根据权重生成概率 probability = np.piecewise( weights, [ current_node_neighbors == previous_node, # 判断当前节点的邻居节点是否在前一个节点的邻居节点中 np.isin(current_node_neighbors, previous_node_neighbors), ], # 返回前一个节点的概率 [lambda w: w / self.p, lambda w: w / 1, lambda w: w / self.q], ) # 标准化概率值 norm_probability = probability / sum(probability) # 从 current_node_neighbors中以norm_probability概率抽取一个值 selected = np.random.choice(current_node_neighbors, 1, p=norm_probability)[ 0 ] walk.append(selected) previous_node_neighbors = current_node_neighbors previous_node = current_node walk = [str(w) for w in walk] return walk def do_walks(self, graph) -> None: """ Doing a fixed number of truncated random walk from every node in the graph. Arg types: * **graph** *(NetworkX graph)* - The graph to run the random walks on. """ self.walks = [] self.graph = graph self.edge_fn = _get_edge_fn(graph) self.weight_fn = _get_weight_fn(graph) for node in self.graph.nodes(): for _ in range(self.walk_number): walk_from_node = self.do_walk(node) self.walks.append(walk_from_node) # 初始化自定义Node2Vec模型 customnode2vec = CustomNode2Vec(walk_number=10, walk_length=80, dimensions=128, epochs=10) # 训练模型 customnode2vec.fit(G) # 获取嵌入 embedding = customnode2vec.get_embedding()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。