赞
踩
DGL是目前非常流行的用于以知识图谱为代表的图神经网络研究的python包,在阅读项目代码GitHub@RE-Net 时发现该库非常重要, 几乎目前涉及GNN训练的情况需要使用该库进行网络搭建, 该项目代码的相关论文摘要参考【论文阅读】时间序列中的变量是一张知识图谱 ; 这篇paper目前应该算是时序知识图谱中的标杆, 它的模型评估是当前最好的, 主要使用的是自回归的神经网络以及一个RGCN的近邻聚合器;
笔者在本机CPU上跑通代码后开始尝试去跑GPU, 发现4G显存完全吃不住, 连预训练的部分都无法跑通, 于是决定先看一遍DGL官方文档 ; 不得不说这个User Guide真的写得实在是太好了, 区别于那些只是把接口函数的调用说明列得老长老长的库(比如torch
, sklearn
, 还有tensorflow
), DGL的User Guide层次清晰, 以图神经网络的搭建到训练的任务时间线为线索, 非常详细地介绍了如何使用DGL, 并且这篇User Guide更像是一篇综述性质地paper, 学一遍不仅是对GNN能有所了解, 而且对很多方法, 如消息传递, 近邻采样的数学原理也能了解, 图文并茂, 实乃不可多得的资源;
本文笔者主要是对DGL官方文档 的一个翻译, 截至本文发布, Chapter1-4部分官方文档已经有了中文翻译, 笔者在此基础上添加了一些个人理解的备注, 以供查阅; Chapter5-7目前只有英文, 笔者主要是做了一些翻译和备注, 使得阅读起来更加容易;
注意: 本文全部是以pytorch为后端的DGL使用;
PS:
torch
的一个巩固, 其实DGL里面很多数据处理, 训练模型, 模型搭建, 包括自定义模块都与torch
是类似的, 总之强烈推荐去看一遍官方文档, 笔者大概前后看了整整两天, 个人认为花时间完整过一遍一定不会吃亏的;DGL官方文档 的安装方法似乎有些繁琐, 直接下载wheel文件安装即可;
dgl
库, 去清华镜像dgl仓库 下载对应版本的whl文件直接用pip install
安装即可;dgl
库, 目前有五种不同的dgl
库对应不同的CUDA版本:dgl
直接在库命名上就给定了对应的CUDA版本, tensorflow-gpu
则还要查表看不同版本库需要的CUDA支持标准, torch和torchvision可以在https://download.pytorch.org/whl/torch_stable.html 下载, 该repository中也注明了对应的CUDA版本;dgl
最低都到CUDA9.0了, tensorflow-gpu
从2.0.0
开始就至少需要CUDA10.0, 所以建议跑GPU的PC机就不要装乱七八糟的软件了, 不如多装几个版本的CUDA来得实在; 现在CUDA安装配置还挺快捷, 笔者WIN10+1650Ti显卡(N卡)+固态硬盘的配置十几分钟就能装配好一个版本的CUDA, 而且从NVIDIA官网下载安装包似乎非常快, 3G
左右的离线安装包用半小时不到就能下载好, 似乎是有国内代理, 比以前靠谱多了;通过修改C:\Users\caoyang\.dgl\config.json
中的配置值可以修改dgl
库的默认后端, 一般来说就pytorch
和tensorflow
两种, DGL官方文档 额外提到一种MXNet
的后端, 不过它后面的章节基本上以pytorch
为例写的, 其他两种后端都没有怎么提及, 看起来似乎torch
的势头有点反超tensorflow
, Google的tensorflow
在自己的TPU上圈地自萌, 把N卡A卡让给其他开源开发者, 总之笔者是觉得tensorflow
越来越不好用了, 各种意义上的不好用… 而且近期看得几篇近一年内发表的paper, 项目代码都是基于torch
写的, 见仁见智吧, 对于打工人可能也只有全都学一条路可走…
DGL官方文档 给了一个非常有趣的入门示例;
list
存储, 这样的好处是对于稀疏图(即邻接矩阵系数)可以大大减少存储成本, 且无需额外记录图的节点, 直接将两个list
拼接后去重就可以得到所有节点, 不过离群点(出度与入度都为零的节点)是不会被考虑进来的;import dgl import numpy as np def build_karate_club_graph(): # All 78 edges are stored in two numpy arrays. One for source endpoints # while the other for destination endpoints. src = np.array([1, 2, 2, 3, 3, 3, 4, 5, 6, 6, 6, 7, 7, 7, 7, 8, 8, 9, 10, 10, 10, 11, 12, 12, 13, 13, 13, 13, 16, 16, 17, 17, 19, 19, 21, 21, 25, 25, 27, 27, 27, 28, 29, 29, 30, 30, 31, 31, 31, 31, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33]) dst = np.array([0, 0, 1, 0, 1, 2, 0, 0, 0, 4, 5, 0, 1, 2, 3, 0, 2, 2, 0, 4, 5, 0, 0, 3, 0, 1, 2, 3, 5, 6, 0, 1, 0, 1, 0, 1, 23, 24, 2, 23, 24, 2, 23, 26, 1, 8, 0, 24, 25, 28, 2, 8, 14, 15, 18, 20, 22, 23, 29, 30, 31, 8, 9, 13, 14, 15, 18, 19, 20, 22, 23, 26, 27, 28, 29, 30, 31, 32]) # Edges are directional in DGL; Make them bi-directional. u = np.concatenate([src, dst]) v = np.concatenate([dst, src]) # Construct a DGLGraph return dgl.DGLGraph((u, v)) G = build_karate_club_graph() print('We have %d nodes.' % G.number_of_nodes()) print('We have %d edges.' % G.number_of_edges())
We have 34 nodes.
We have 156 edges.
networkx
对图进行可视化:nx.draw()
得到的绘图结果, 需要在代码中添加%matplotlib inline
注解;import networkx as nx
%matplotlib inline
# Since the actual graph is undirected, we convert it for visualization
# purpose.
nx_G = G.to_networkx().to_undirected()
# Kamada-Kawaii layout usually looks pretty for arbitrary graphs
pos = nx.kamada_kawai_layout(nx_G)
nx.draw(nx_G, pos, with_labels=True, node_color=[[.7, .7, .7]])
DGLGraph
图的边和节点都可以进行赋值, 所谓赋值可以理解为添加特征, 特征当然可以不止一个, 下面的示例是给所有节点添加名为feat
的特征, 如果需要给边赋值则将ndata
替换为edata
即可;# In DGL, you can add features for all nodes at once, using a feature tensor that # batches node features along the first dimension. The code below adds the learnable # embeddings for all nodes: import torch import torch.nn as nn import torch.nn.functional as F embed = nn.Embedding(34, 5) # 34 nodes with embedding dim equal to 5 G.ndata['feat'] = embed.weight # print out node 2's input feature print(G.ndata['feat'][2]) # print out node 10 and 11's input features print(G.ndata['feat'][[10, 11]])
tensor([ 0.4228, -1.1062, -0.1551, 1.1317, 0.9008], grad_fn=<SelectBackward>)
tensor([[ 0.3872, 0.9674, -0.0219, 0.3755, -0.6305],
[-0.7338, -0.4529, 1.1352, -0.6787, -1.0478]],
grad_fn=<IndexBackward>)
from dgl.nn.pytorch import GraphConv class GCN(nn.Module): def __init__(self, in_feats, hidden_size, num_classes): super(GCN, self).__init__() self.conv1 = GraphConv(in_feats, hidden_size) self.conv2 = GraphConv(hidden_size, num_classes) def forward(self, g, inputs): h = self.conv1(g, inputs) h = torch.relu(h) h = self.conv2(g, h) return h # The first layer transforms input features of size of 5 to a hidden size of 5. # The second layer transforms the hidden layer and produces output features of # size 2, corresponding to the two groups of the karate club. net = GCN(5, 5, 2)
简单定义所有的节点编号以及分类标签的编号, 以及模型的输入初始值inputs
;
inputs = embed.weight
labeled_nodes = torch.tensor([0, 33]) # only the instructor and the president nodes are labeled
labels = torch.tensor([0, 1]) # their labels are different
torch
模型训练没有区别, 代码与输出结果如下所示:import itertools optimizer = torch.optim.Adam(itertools.chain(net.parameters(), embed.parameters()), lr=0.01) all_logits = [] for epoch in range(50): logits = net(G, inputs) # we save the logits for visualization later all_logits.append(logits.detach()) logp = F.log_softmax(logits, 1) # we only compute loss for labeled nodes loss = F.nll_loss(logp[labeled_nodes], labels) optimizer.zero_grad() loss.backward() optimizer.step() print('Epoch %d | Loss: %.4f' % (epoch, loss.item()))
Epoch 0 | Loss: 0.8385 Epoch 1 | Loss: 0.8092 Epoch 2 | Loss: 0.7829 Epoch 3 | Loss: 0.7614 Epoch 4 | Loss: 0.7426 Epoch 5 | Loss: 0.7266 Epoch 6 | Loss: 0.7128 Epoch 7 | Loss: 0.6996 Epoch 8 | Loss: 0.6895 Epoch 9 | Loss: 0.6809 Epoch 10 | Loss: 0.6723 Epoch 11 | Loss: 0.6639 Epoch 12 | Loss: 0.6555 Epoch 13 | Loss: 0.6467 Epoch 14 | Loss: 0.6376 Epoch 15 | Loss: 0.6282 Epoch 16 | Loss: 0.6188 Epoch 17 | Loss: 0.6095 Epoch 18 | Loss: 0.5996 Epoch 19 | Loss: 0.5893 Epoch 20 | Loss: 0.5783 Epoch 21 | Loss: 0.5670 Epoch 22 | Loss: 0.5552 Epoch 23 | Loss: 0.5430 Epoch 24 | Loss: 0.5300 Epoch 25 | Loss: 0.5170 Epoch 26 | Loss: 0.5037 Epoch 27 | Loss: 0.4903 Epoch 28 | Loss: 0.4767 Epoch 29 | Loss: 0.4621 Epoch 30 | Loss: 0.4471 Epoch 31 | Loss: 0.4316 Epoch 32 | Loss: 0.4163 Epoch 33 | Loss: 0.4006 Epoch 34 | Loss: 0.3838 Epoch 35 | Loss: 0.3662 Epoch 36 | Loss: 0.3481 Epoch 37 | Loss: 0.3295 Epoch 38 | Loss: 0.3103 Epoch 39 | Loss: 0.2908 Epoch 40 | Loss: 0.2716 Epoch 41 | Loss: 0.2526 Epoch 42 | Loss: 0.2339 Epoch 43 | Loss: 0.2157 Epoch 44 | Loss: 0.1981 Epoch 45 | Loss: 0.1812 Epoch 46 | Loss: 0.1647 Epoch 47 | Loss: 0.1483 Epoch 48 | Loss: 0.1326 Epoch 49 | Loss: 0.1179
import matplotlib.animation as animation import matplotlib.pyplot as plt def draw(i): cls1color = '#00FFFF' cls2color = '#FF00FF' pos = {} colors = [] for v in range(34): pos[v] = all_logits[i][v].numpy() cls = pos[v].argmax() colors.append(cls1color if cls else cls2color) ax.cla() ax.axis('off') ax.set_title('Epoch: %d' % i) nx.draw_networkx(nx_G.to_undirected(), pos, node_color=colors, with_labels=True, node_size=300, ax=ax) fig = plt.figure(dpi=150) fig.clf() ax = fig.subplots() draw(0) # draw the prediction of the first epoch plt.close()
ani = animation.FuncAnimation(fig, draw, frames=len(all_logits), interval=200)
详见DGL官方文档 文字描述;
dgl
库通过存储所有边的出入节点来构建图, 节点一般使用自然数进行编号;dgl.graph()
可以创建一个DGLGraph
对象, 本章第4节介绍了从其他图网络库的实例化对象直接构建图的方法(如networkx
库);import dgl
import torch as th
# edges 0->1, 0->2, 0->3, 1->3
u, v = th.tensor([0, 0, 0, 1]), th.tensor([1, 2, 3, 3])
g = dgl.graph((u, v))
print(g) # number of nodes are inferred from the max node IDs in the given edges
# Node IDs
print(g.nodes())
# Edge end nodes
print(g.edges())
# Edge end nodes and edge IDs
print(g.edges(form='all'))
# If the node with the largest ID is isolated (meaning no edges),
# then one needs to explicitly set the number of nodes
g = dgl.graph((u, v), num_nodes=8)
Graph(num_nodes=4, num_edges=4,
ndata_schemes={}
edata_schemes={})
tensor([0, 1, 2, 3])
(tensor([0, 0, 0, 1]), tensor([1, 2, 3, 3]))
(tensor([0, 0, 0, 1]), tensor([1, 2, 3, 3]), tensor([0, 1, 2, 3]))
dgl.to_bidirected()
函数来实现这个目的, 该函数可以把原图转换成一个包含反向边的图;bg = dgl.to_bidirected(g)
bg.edges()
(tensor([0, 0, 0, 1, 1, 2, 3, 3]), tensor([1, 2, 3, 0, 3, 0, 0, 1]))
tensor
作为dgl.graph()
的参数输入, 不过也支持array
和list
进行快速测试, 前者相对来说在资源处理上更优化; 且可以通过配置dgl.graph()
的参数idtype
来修正图存储的数据类型, 比如将默认值int64
改为int32
就可以大大节约存储空间; 具体数据类型转换如下所示:edges = th.tensor([2, 5, 3]), th.tensor([3, 5, 0]) # edges 2->3, 5->5, 3->0
g64 = dgl.graph(edges) # DGL uses int64 by default
print(g64.idtype)
g32 = dgl.graph(edges, idtype=th.int32) # create a int32 graph
g32.idtype
g64_2 = g32.long() # convert to int64
g64_2.idtype
g32_2 = g64.int() # convert to int32
g32_2.idtype
torch.int64
torch.int32
torch.int64
torch.int32
dgl.graph()
;dgl.DGLGraph.nodes()
;dgl.DGLGraph.edges()
;dgl.to_bidirected()
;dgl.DGLGraph.int()
;dgl.DGLGraph.long()
;dgl.DGLGraph.idtype
;DGLGraph
对象的ndata
和edata
中:import dgl
import torch as th
g = dgl.graph(([0, 0, 1, 5], [1, 2, 2, 0])) # 6 nodes, 4 edges
print(g)
g.ndata['x'] = th.ones(g.num_nodes(), 3) # node feature of length 3
g.edata['x'] = th.ones(g.num_edges(), dtype=th.int32) # scalar integer feature
print(g)
# different names can have different shapes
g.ndata['y'] = th.randn(g.num_nodes(), 5)
print(g.ndata['x'][1]) # get node 1's feature
print(g.edata['x'][th.tensor([0, 3])]) # get features of edge 0 and 3
Graph(num_nodes=6, num_edges=4,
ndata_schemes={}
edata_schemes={})
Graph(num_nodes=6, num_edges=4,
ndata_schemes={'x': Scheme(shape=(3,), dtype=torch.float32)}
edata_schemes={'x': Scheme(shape=(), dtype=torch.int32)})
tensor([1., 1., 1.])
tensor([1, 1], dtype=torch.int32)
ndata
和edata
的注意点:
dgl.DGLGraph.ndata
;dgl.DGLGraph.edata
;scipy
稀疏矩阵创建图:import dgl
import torch as th
import scipy.sparse as sp
spmat = sp.rand(100, 100, density=0.05) # 5% nonzero entries
print(dgl.from_scipy(spmat)) # from SciPy
Graph(num_nodes=100, num_edges=500,
ndata_schemes={}
edata_schemes={})
networkx
图创建图:import networkx as nx
nx_g = nx.path_graph(5) # a chain 0-1-2-3-4
print(dgl.from_networkx(nx_g)) # from networkx
nxg = nx.DiGraph([(2, 1), (1, 2), (2, 3), (0, 0)])
print(dgl.from_networkx(nxg))
Graph(num_nodes=5, num_edges=8,
ndata_schemes={}
edata_schemes={})
Graph(num_nodes=4, num_edges=4,
ndata_schemes={}
edata_schemes={})
nx.path_graph(5)
会转成8条边, 原因是这是一个networkx
库的无向图, 而DGLGraph
必须是有向图, 所以给每条边都定义了正反两个方向;dgl.from_scipy()
;dgl.from_networkx()
;dgl.heterograph
import dgl import torch as th # Create a heterograph with 3 node types and 3 edges types. graph_data = { ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])), ('drug', 'interacts', 'gene'): (th.tensor([0, 1]), th.tensor([2, 3])), ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2])) } g = dgl.heterograph(graph_data) print(g.ntypes) print(g.etypes) print(g.canonical_etypes) print(g) print(g.metagraph().edges()) print(g.num_nodes()) print(g.num_nodes('drug')) print(g.nodes('drug'))
['disease', 'drug', 'gene']
['interacts', 'interacts', 'treats']
[('drug', 'interacts', 'drug'), ('drug', 'interacts', 'gene'), ('drug', 'treats', 'disease')]
Graph(num_nodes={'disease': 3, 'drug': 3, 'gene': 4},
num_edges={('drug', 'interacts', 'drug'): 2, ('drug', 'interacts', 'gene'): 2, ('drug', 'treats', 'disease'): 1},
metagraph=[('drug', 'drug', 'interacts'), ('drug', 'gene', 'interacts'), ('drug', 'disease', 'treats')])
[('drug', 'drug'), ('drug', 'gene'), ('drug', 'disease')]
10
3
tensor([0, 1, 2])
metagraph
, 本质是记录该异构图中所有不重复的RDF三元组
(
s
,
r
,
o
)
(s,r,o)
(s,r,o)dgl.heterograph()
;ntypes
;etypes
;canonical_etypes
;metagraph
;num_nodes()
: 不加参数就是所有节点数, 可以添加参数找出特定名称的节点总数;nodes()
: 必须加参数(节点名称), 返回所有该节点的编号张量;# Set/get feature 'hv' for nodes of type 'drug'
g.nodes['drug'].data['hv'] = th.ones(3, 1)
print(g.nodes['drug'].data['hv'])
# Set/get feature 'he' for edge of type 'treats'
g.edges['treats'].data['he'] = th.zeros(1, 1)
print(g.edges['treats'].data['he'])
tensor([[1.],
[1.],
[1.]])
tensor([[0.]])
dgl.edge_type_subgraph
方法;g = dgl.heterograph({
('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])),
('drug', 'interacts', 'gene'): (th.tensor([0, 1]), th.tensor([2, 3])),
('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2]))
})
g.nodes['drug'].data['hv'] = th.ones(3, 1)
# Retain relations ('drug', 'interacts', 'drug') and ('drug', 'treats', 'disease')
# All nodes for 'drug' and 'disease' will be retained
eg = dgl.edge_type_subgraph(g, [('drug', 'interacts', 'drug'),
('drug', 'treats', 'disease')])
print(eg)
# The associated features will be copied as well
print(eg.nodes['drug'].data['hv'])
Graph(num_nodes={'disease': 3, 'drug': 3},
num_edges={('drug', 'interacts', 'drug'): 2, ('drug', 'treats', 'disease'): 1},
metagraph=[('drug', 'drug', 'interacts'), ('drug', 'disease', 'treats')])
tensor([[1.],
[1.],
[1.]])
异构图为管理不同类型的节点和边及其相关特征提供了一个清晰的接口; 这在以下情况下尤其有用:
如果上述情况不适用, 并且用户不希望在建模中区分节点和边的类型, 则dgl
允许使用dgl.DGLGraph.to_homogeneous()
A将异构图转换为同构图, 具体算法如下:
代码示例:
g = dgl.heterograph({ ('drug', 'interacts', 'drug'): (th.tensor([0, 1]), th.tensor([1, 2])), ('drug', 'treats', 'disease'): (th.tensor([1]), th.tensor([2]))}) g.nodes['drug'].data['hv'] = th.zeros(3, 1) g.nodes['disease'].data['hv'] = th.ones(3, 1) g.edges['interacts'].data['he'] = th.zeros(2, 1) g.edges['treats'].data['he'] = th.zeros(1, 2) # By default, it does not merge any features hg = dgl.to_homogeneous(g) print('hv' in hg.ndata) # Copy node features hg = dgl.to_homogeneous(g, ndata=['hv']) print(hg.ndata['hv']) # Copy edge features # For feature copy, it expects features to have # the same size and dtype across node/edge types hg = dgl.to_homogeneous(g, edata=['he'])
输出结果:
False
tensor([[1.],
[1.],
[1.],
[0.],
[0.],
[0.]])
DGLError: Cannot concatenate column he with shape Scheme(shape=(2,), dtype=torch.float32) and shape Scheme(shape=(1,), dtype=torch.float32)
续: 代码示例:
# Order of node types in the heterograph
print(g.ntypes)
# Original node types
print(hg.ndata[dgl.NTYPE])
# Original type-specific node IDs
print(hg.ndata[dgl.NID])
# Order of edge types in the heterograph
print(g.etypes)
# Original edge types
print(hg.edata[dgl.ETYPE])
# Original type-specific edge IDs
print(hg.edata[dgl.EID])
输出结果:
['disease', 'drug']
tensor([0, 0, 0, 1, 1, 1])
tensor([0, 1, 2, 0, 1, 2])
['interacts', 'treats']
tensor([0, 0, 1])
tensor([0, 1, 0])
ndata
和edata
中;其他注意事项:
dgl.edge_type_subgraph
方法), 然后将该子图转换为同构图。dgl.save_graphs(filename, g_list, labels=None)
;
g_list
是一个list
, 里面可以放多个图, 如[g1, g2]
;labels
应当是一个str2Tensor
的字典;import dgl
import torch as th
g1 = dgl.graph(([0, 1, 2], [1, 2, 3]))
g2 = dgl.graph(([0, 2], [2, 3]))
g2.edata["e"] = th.ones(2, 4)
from dgl.data.utils import save_graphs
graph_labels = {"glabel": th.tensor([0, 1])}
save_graphs("./data.bin", [g1, g2], graph_labels)
dgl.load_graphs(filename, idx_list=None)
;
idx_list
以便于区分不同图了, 这是一个整数列表;graph_list
和labels
, 后者即保存时定义的labels
;from dgl.data.utils import load_graphs
glist, label_dict = load_graphs("./data.bin") # glist will be [g1, g2]
glist, label_dict = load_graphs("./data.bin", [0]) # glist will be [g1]
DGLGraph
:用两个已经存储在GPU上的tensor
来创建DGLGraph
;
使用dgl.DGLGraph.to(device)
方法将DGLGraph
移动到指定device
的cuda上;
代码示例:
import dgl
import torch as th
u, v = th.tensor([0, 1, 2]), th.tensor([2, 3, 4])
g = dgl.graph((u, v))
g.ndata['x'] = th.randn(5, 3) # original feature is on CPU
print(g.device)
cuda_g = g.to('cuda:0') # accepts any device objects from backend framework
print(cuda_g.device)
print(cuda_g.ndata['x'].device) # feature data is copied to GPU too
# A graph constructed from GPU tensors is also on GPU
u, v = u.to('cuda:0'), v.to('cuda:0')
g = dgl.graph((u, v))
print(g.device)
输出结果:
cpu
cuda:0
cuda:0
cuda:0
print(cuda_g.in_degrees())
print(cuda_g.in_edges([2, 3, 4])) # ok for non-tensor type arguments
print(cuda_g.in_edges(th.tensor([2, 3, 4]).to('cuda:0'))) # tensor type must be on GPU
cuda_g.ndata['h'] = th.randn(5, 4) # ERROR! feature must be on GPU too!
tensor([0, 0, 1, 1, 1], device='cuda:0')
(tensor([0, 1, 2], device='cuda:0'), tensor([2, 3, 4], device='cuda:0'))
(tensor([0, 1, 2], device='cuda:0'), tensor([2, 3, 4], device='cuda:0'))
DGLError: Cannot assign node feature "h" on device cpu to a graph on device cuda:0. Call DGLGraph.to() to copy the graph to the same device.
graph.to(device)
不好使, 最后只能是手动把torch.tensor
全部转到cuda上, 恶心了很久;edges
, 类型为dgl.EdgeBatch
; edges
有src
, dst
和data
三个成员属性, 分别用于访问源节点, 目标节点和边的特征;nodes
, 类型为dgl.NodeBatch
; nodes
有成员属性mailbox
可以用来访问节点收到的消息;nodes
, 类型为dgl.NodeBatch
; 与聚合函数中的参数相同;dgl.function
: https://docs.dgl.ai/api/python/dgl.function.html ;copy
函数;add
, sub
, mul
, div
, dot
函数;u
表示源节点, v
表示目标节点, e
表示边;hu
特征和目标节点的hv
特征求和, 然后将结果保存在边的he
特征上, 用户可以使用内置函数dgl.function.u_add_v('hu', 'hv', 'he')
;def message_func(edges):
return {'he': edges.src['hu'] + edges.dst['hv']}
sum
, max
, min
, mean
操作;mailbox
中的字段名;dgl.function.sum('m', 'h')
等价于如下所示的对接收到消息求和的用户定义函数:import torch
def reduce_func(nodes):
return {'h': torch.sum(nodes.mailbox['m'], dim=1)}
apply_edges()
单独调用逐边计算:import dgl.function as fn
graph.apply_edges(fn.u_add_v('el', 'er', 'e'))
update_all()
update_all
完成后直接对节点特征进行操作;update_all
中指定更新函数;def updata_all_example(graph):
# store the result in graph.ndata['ft']
graph.update_all(fn.u_mul_e('ft', 'a', 'm'),
fn.sum('m', 'ft'))
# Call update function outside of update_all
final_ft = graph.ndata['ft'] * 2
return final_ft
ft
与边特征a
相乘生成消息m
, 然后对所有消息求和来更新节点特征ft
, 再将ft
乘以2得到最终结果final_ft
; 调用结束后, 中间消息将被清楚; 数学公式如下:
f
i
n
a
l
_
f
t
i
=
2
⋅
∑
j
∈
N
(
i
)
(
f
t
j
⋅
a
i
j
)
{\rm final\_ft}_i=2\cdot \sum_{j\in \mathcal{N}(i)}({\rm ft}_j\cdot a_{ij})
final_fti=2⋅j∈N(i)∑(ftj⋅aij)关于
dgl
内置函数是如何优化消息传递的内存消耗和计算速度的, 详见文字描述: DGL官方文档 ; 总结来说主要是合并内核, 并行逐边运算, 减少点边拷贝等; 如update_all()
函数就是一个效率很高的接口; 如果确实需要使用apply_edges()
函数在边上保存消息, 则内存占用会非常大;
import torch
import torch.nn as nn
linear = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim * 2)))
def concat_message_function(edges):
return {'cat_feat': torch.cat([edges.src.ndata['feat'], edges.dst.ndata['feat']])}
g.apply_edges(concat_message_function)
g.edata['out'] = g.edata['cat_feat'] * linear
import dgl.function as fn
linear_src = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim)))
linear_dst = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim)))
out_src = g.ndata['feat'] * linear_src
out_dst = g.ndata['feat'] * linear_dst
g.srcdata.update({'out_src': out_src})
g.dstdata.update({'out_dst': out_dst})
g.apply_edges(fn.u_add_v('out_src', 'out_dst', 'out'))
feat_src
和feat_dst
, 空间占用小, 另外加法可以直接用内置函数u_add_v
进行优化, 内置函数的效率一般比自定义函数要高;之前有提到过, dgl的特征赋值是不能只对部分节点进行的, 但是可以对图中部分节点进行更新, 方法是先构造子图, 然后在子图上调用update_all()
方法即可; 这是Mini-Batch训练中的常用手段, 关于Mini-Batch详见本文Chapter 6中的相关内容; 代码示例如下:
nid = [0, 2, 3, 6, 7, 9]
sg = g.subgraph(nid)
sg.update_all(message_func, reduce_func, apply_node_func)
图注意力网络(GAT) 以及一些图卷积网络(GCN)的变种 , 这两篇paper里都提到了在消息聚合前使用边的权重, dgl
库中的做法是将权重存为边的特征, 并在消息函数中将边的特征与源节点的特征相乘; 代码示例如下, 其中affinity
即为边的权重, 通常为一个标量, 本质上就是加权聚合, 如注意力机制的方法:
import dgl.function as fn
graph.edata['a'] = affinity
graph.update_all(fn.u_mul_e('ft', 'a', 'm'),
fn.sum('m', 'ft'))
DGLGraph.multi_update_all(etype_dict, cross_reducer, apply_node_func=None)
etype_dict
: dict
类型, 键为一种关系, 值为这种关系对应的update_all()
的参数;cross_reducer
: str
类型, 表示跨类型整合函数, 来指定整合不同关系聚合结果的方式, 可以是sum
, min
, max
, mean
, stack
中之一;import dgl.function as fn
for c_etype in G.canonical_etypes:
srctype, etype, dsttype = c_etype
Wh = self.weight[etype](feat_dict[srctype])
# Save it in graph for message passing
G.nodes[srctype].data['Wh_%s' % etype] = Wh
# Specify per-relation message passing functions: (message_func, reduce_func).
# Note that the results are saved to the same destination feature 'h', which
# hints the type wise reducer for aggregation.
funcs[etype] = (fn.copy_u('Wh_%s' % etype, 'm'), fn.mean('m', 'h'))
# Trigger message passing of multiple types.
G.multi_update_all(funcs, 'sum')
# return the updated node feature dictionary
return {ntype : G.nodes[ntype].data['h'] for ntype in G.ntypes}
multi_update_all
函数一波带, 返回结果是更新过的节点的特征字典;dgl.nn
模块是用户构建GNN模型的基本模块, 根据不同的dgl
后端, 该模块的父类也会继承自不同的类(如使用torch
, 自然就是继承torch.nn.Module
, 与torch
自定义层或网络是类似的), 于是其构造函数中的参数注册以及前向传播中使用的张量操作也与后端框架一样, 所以本质dgl
就可以视为一个插件可以直接嵌入到后端的深度学习库中, 非常便捷, 区别只在于dgl
定义了消息传递的操作框架;
详细的dgl.nn
模块内容可见https://docs.dgl.ai/api/python/nn.html , 常用的卷积层, 全连接层, 全局池化层以及一些工具函数都在当中定义;
本章将以torch
作为后端进行介绍, 以dgl.nn.pytorch.conv.SAGEConv
层的编写逻辑为例介绍自定义层的写法;
SAGEConv的数学公式如下所示, 这将在第2节中用于构建forward
函数:
h
N
(
d
s
t
)
(
l
+
1
)
=
a
g
g
r
e
g
a
t
e
(
{
h
s
r
c
l
,
∀
s
r
c
∈
N
(
d
s
t
)
}
)
h
d
s
t
l
+
1
=
σ
(
W
⋅
c
o
n
c
a
t
(
h
d
s
t
l
,
h
N
(
d
s
t
)
l
+
1
)
+
b
)
h
d
s
t
l
+
1
=
n
o
r
m
(
h
d
s
t
l
)
h_{\mathcal{N}(dst)}^{(l+1)}={\rm aggregate}(\{h_{src}^l,\forall src\in \mathcal{N}(dst)\})\\h_{dst}^{l+1}=\sigma(W\cdot {\rm concat}(h_{dst}^l,h_{\mathcal{N}(dst)}^{l+1})+b)\\h_{dst}^{l+1}={\rm norm}(h_{dst}^{l})
hN(dst)(l+1)=aggregate({hsrcl,∀src∈N(dst)})hdstl+1=σ(W⋅concat(hdstl,hN(dst)l+1)+b)hdstl+1=norm(hdstl)
import torch.nn as nn from dgl.utils import expand_as_pair class SAGEConv(nn.Module): def __init__(self, in_feats, out_feats, aggregator_type, bias=True, norm=None, activation=None): super(SAGEConv, self).__init__() self._in_src_feats, self._in_dst_feats = expand_as_pair(in_feats) self._out_feats = out_feats self._aggre_type = aggregator_type self.norm = norm self.activation = activation # aggregator type: mean, max_pool, lstm, gcn if aggregator_type not in ['mean', 'max_pool', 'lstm', 'gcn']: raise KeyError('Aggregator type {} not supported.'.format(aggregator_type)) if aggregator_type == 'max_pool': self.fc_pool = nn.Linear(self._in_src_feats, self._in_src_feats) if aggregator_type == 'lstm': self.lstm = nn.LSTM(self._in_src_feats, self._in_src_feats, batch_first=True) if aggregator_type in ['mean', 'max_pool', 'lstm']: self.fc_self = nn.Linear(self._in_dst_feats, out_feats, bias=bias) self.fc_neigh = nn.Linear(self._in_src_feats, out_feats, bias=bias) self.reset_parameters() def reset_parameters(self): """Reinitialize learnable parameters.""" gain = nn.init.calculate_gain('relu') if self._aggre_type == 'max_pool': nn.init.xavier_uniform_(self.fc_pool.weight, gain=gain) if self._aggre_type == 'lstm': self.lstm.reset_parameters() if self._aggre_type != 'gcn': nn.init.xavier_uniform_(self.fc_self.weight, gain=gain) nn.init.xavier_uniform_(self.fc_neigh.weight, gain=gain)
self._aggre_type
, 常用的选项有mean
, sum
, max
, min
; 一些模块可能会使用更加复杂的聚合函数, 比如lstm
;self.norm
是用于特征归一化的函数, 在SAGEConv的定义中, 归一化可以是L2归一化, 即将特征除以它的二范数;torch.nn.Module
类型, 如torch.nn.Linear
, torch.nn.LSTM
等;reset_parameters
进行权重初始化;torch
中的forward
函数, 这里的forward
函数执行实际的消息传递计算, 除了常见的张量运算外, 这里多出一个参数dgl.DGLGraph
; 函数中一般包含以下三个部分:以下将以SAGEConv的
forward
函数为例, 介绍这三个部分;
def forward(self, graph, feat): with graph.local_scope(): # Specify graph type then expand input feature according to graph type feat_src, feat_dst = expand_as_pair(feat, graph) def expand_as_pair(input_, g=None): if isinstance(input_, tuple): # Bipartite graph case return input_ elif g is not None and g.is_block: # Subgraph block case if isinstance(input_, Mapping): input_dst = { k: F.narrow_row(v, 0, g.number_of_dst_nodes(k)) for k, v in input_.items()} else: input_dst = F.narrow_row(input_, 0, g.number_of_dst_nodes()) return input_, input_dst else: # Homogeneous graph case return input_, input_
feat_src
和目标节点特征feat_dst
需要根据图类型被指定, 由feat
扩展得到feat_src
和feat_dst
;(src_type, edge_type, dst_type)
; 当输入特征feat
是一个元组时, 图将会被视为二分图; 元组中的第一个元素为源节点特征, 第二个元素为目标节点特征;block
, 在block
创建的阶段, dst nodes
位于列表的最前面; 通过索引[0:g.number_of_dst_nodes()]
可以找到feat_dst
;
update_all()
和dgl
库内置的消息函数和聚合函数来实现; 这有助于性能优化;import dgl.function as fn import torch.nn.functional as F from dgl.utils import check_eq_shape if self._aggre_type == 'mean': graph.srcdata['h'] = feat_src graph.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'neigh')) h_neigh = graph.dstdata['neigh'] elif self._aggre_type == 'gcn': check_eq_shape(feat) graph.srcdata['h'] = feat_src graph.dstdata['h'] = feat_dst graph.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'neigh')) # divide in_degrees degs = graph.in_degrees().to(feat_dst) h_neigh = (graph.dstdata['neigh'] + graph.dstdata['h']) / (degs.unsqueeze(-1) + 1) elif self._aggre_type == 'max_pool': graph.srcdata['h'] = F.relu(self.fc_pool(feat_src)) graph.update_all(fn.copy_u('h', 'm'), fn.max('m', 'neigh')) h_neigh = graph.dstdata['neigh'] else: raise KeyError('Aggregator type {} not recognized.'.format(self._aggre_type)) # GraphSAGE GCN does not require fc_self. if self._aggre_type == 'gcn': rst = self.fc_neigh(h_neigh) else: rst = self.fc_self(h_self) + self.fc_neigh(h_neigh)
# activation
if self.activation is not None:
rst = self.activation(rst)
# normalization
if self.norm is not None:
rst = self.norm(rst)
return rst
forward
函数的最后一部分是在完成消息聚合后更新节点的特征;将二三步的代码示例拼接到第一步的代码示例中的
forward
函数中即可;
dgl.nn.pytorch.HeteroGraphConv(mods, aggregate='sum')
multi_update_all
函数相同, 包括:
dgl.nn
模块;dgl.nn
模块,
A
G
G
AGG
AGG是聚合函数;import torch.nn as nn
class HeteroGraphConv(nn.Module):
def __init__(self, mods, aggregate='sum'):
super(HeteroGraphConv, self).__init__()
self.mods = nn.ModuleDict(mods)
if isinstance(aggregate, str):
# An internal function to get common aggregation functions
self.agg_fn = get_aggregate_fn(aggregate)
else:
self.agg_fn = aggregate
mods
: dict
类型, 字典的键为关系名, 值为作用在该关系上NN模块对象;aggregate
: 指定了如何聚合来自不同关系的结果;forward
函数代码示例:def forward(self, g, inputs, mod_args=None, mod_kwargs=None): if mod_args is None: mod_args = {} if mod_kwargs is None: mod_kwargs = {} outputs = {nty : [] for nty in g.dsttypes} if g.is_block: src_inputs = inputs dst_inputs = {k: v[:g.number_of_dst_nodes(k)] for k, v in inputs.items()} else: src_inputs = dst_inputs = inputs for stype, etype, dtype in g.canonical_etypes: rel_graph = g[stype, etype, dtype] if rel_graph.num_edges() == 0: continue if stype not in src_inputs or dtype not in dst_inputs: continue dstdata = self.mods[etype]( rel_graph, (src_inputs[stype], dst_inputs[dtype]), *mod_args.get(etype, ()), **mod_kwargs.get(etype, {})) outputs[dtype].append(dstdata) rsts = {} for nty, alist in outputs.items(): if len(alist) != 0: rsts[nty] = self.agg_fn(alist, nty)
g
和输入张量inputs
, forward
函数还使用2个额外的字典参数:
mod_args
;mod_kwargs
;self.mods
具有相同的键, 值则为对应dgl.nn
模块的自定义参数;forward
函数的输出结果也是一个字典类型的对象:
nty
;nty
的输出张量的list
; 表示来自不同关系的计算结果; HeteroGraphConv会对这个list
进一步聚合, 并将结果返回给用户;g
可以是异构图或来自异构图的子图区块; 和普通的dgl.nn
模块一样, forward
函数需要分别处理不同的输入图类型;for
循环为处理异构图计算的主要逻辑:
canonical_etypes
);g[stype, etype, dtype]
将只包含该关系的子图(rel_graph
)抽取出来;src_inputs[stype], dst_inputs[dtype]
);outputs
字典中;self.agg_fn
函数聚合来自多个关系的结果;dgl.data
模块中实现了很多常用的图数据集; 它们遵循由dgl.data.DGLDataset
类定义的标准的数据处理管道;
官方文档推荐将图数据处理为dgl.data.DGLDataset
的子类, 因为该类为导入, 处理和保存图数据提供了很多工具函数;
class dgl.data.DGLDataset(name, url=None, raw_dir=None, save_dir=None, hash_key=(), force_reload=False, verbose=False)
: DGL官方文档 ;DGLDataset
类是处理, 导入和保存dgl.data
模块中定义的图数据集的基类; 它实现了用于处理图数据的基本模板;DGLDataset
模板的工作方式: from dgl.data import DGLDataset class MyDataset(DGLDataset): """ Template for customizing graph datasets in DGL. Parameters ---------- url : str URL to download the raw dataset raw_dir : str Specifying the directory that will store the downloaded data or the directory that already stores the input data. Default: ~/.dgl/ save_dir : str Directory to save the processed dataset. Default: the value of `raw_dir` force_reload : bool Whether to reload the dataset. Default: False verbose : bool Whether to print out progress information """ def __init__(self, url=None, raw_dir=None, save_dir=None, force_reload=False, verbose=False): super(MyDataset, self).__init__(name='dataset_name', url=url, raw_dir=raw_dir, save_dir=save_dir, force_reload=force_reload, verbose=verbose) def download(self): # download raw data to local disk pass def process(self): # process raw data to graphs, labels, splitting masks pass def __getitem__(self, idx): # get one example by index pass def __len__(self): # number of data examples pass def save(self): # save processed data to directory `self.save_path` pass def load(self): # load processed data from directory `self.save_path` pass def has_cache(self): # check whether there are processed data in `self.save_path` pass
DGLDataset
类时必须实现其中的三个抽象函数:
process()
;__getitem__(idx)
;__len__()
;save()
和load()
函数, 一般会频繁地用于保存checkpoint, 详细接口可见本章第4节的内容;download
函数;raw_dir
中, 这是父类DGLDataset
中的规定;download
函数一般用于下载远程服务器上的数据, 如果数据集是zip
格式的压缩包, 则可以直接继承dgl.data.DGLBuiltinDataset
类编写数据模板, 它支持解压缩zip
文件, 具体可以参考QM7bDataset
类: DGL官方文档 ;download
函数示例:import os
from dgl.data.utils import download
def download(self):
# path to store the file
file_path = os.path.join(self.raw_dir, self.name + '.mat')
# download file
download(self.url, path=file_path)
.mat
文件下载到目录self.raw_dir
;.gz, .tar, .tar.gz, .tgz
的文件, 则可以使用dgl.data.utils.extract_archive(file, target_dir, overwrite=False)
;BitcoinOTCDataset
类中下载.gz
文件的代码示例: from dgl.data.utils import download, check_sha1
def download(self):
# path to store the file
# make sure to use the same suffix as the original file name's
gz_file_path = os.path.join(self.raw_dir, self.name + '.csv.gz')
# download file
download(self.url, path=gz_file_path)
# check SHA-1
if not check_sha1(gz_file_path, self._sha1_str):
raise UserWarning('File {} is downloaded but the content hash does not match.'
'The repo may be outdated or download may be incomplete. '
'Otherwise you can create an issue for it.'.format(self.name + '.csv.gz'))
# extract file to directory `self.name` under `self.raw_dir`
self._extract_gz(gz_file_path, self.raw_path)
process
函数;QM7bDataset
为例, 5.4中会例举更多的数据集;dgl.DGLGraph
对象的列表和标签张量的列表;QM7bDataset
类的源码示例:from dgl.data import DGLDataset class QM7bDataset(DGLDataset): _url = 'http://deepchem.io.s3-website-us-west-1.amazonaws.com/' \ 'datasets/qm7b.mat' _sha1_str = '4102c744bb9d6fd7b40ac67a300e49cd87e28392' def __init__(self, raw_dir=None, force_reload=False, verbose=False): super(QM7bDataset, self).__init__(name='qm7b', url=self._url, raw_dir=raw_dir, force_reload=force_reload, verbose=verbose) def process(self): mat_path = self.raw_path + '.mat' # process data to a list of graphs and a list of labels self.graphs, self.label = self._load_graph(mat_path) def __getitem__(self, idx): """ Get graph and label by index Parameters ---------- idx : int Item index Returns ------- (dgl.DGLGraph, Tensor) """ return self.graphs[idx], self.label[idx] def __len__(self): """Number of graphs in the dataset""" return len(self.graphs)
process
将原始数据处理为图列表和标签列表;__getitem__(idx)
和__len__()
以进行迭代;__getitem__(idx)
返回如上面代码所示的元组(graph, label)
;QM7bDataset
源代码 以获得self._load_graph()
和__getitem__
的详细信息;QM7bDataset
中, 用户可以添加属性num_labels
来指示此多任务数据集中的预测任务总数:@property
def num_labels(self):
"""Number of labels for each graph, i.e. number of prediction tasks."""
return 14
QM7bDataset
类代码示例: 这里是调用了torch
的数据加载器;import dgl import torch from torch.utils.data import DataLoader # load data dataset = QM7bDataset() num_labels = dataset.num_labels # create collate_fn def _collate_fn(batch): graphs, labels = batch g = dgl.batch(graphs) labels = torch.tensor(labels, dtype=torch.long) return g, labels # create dataloaders dataloader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=_collate_fn) # training for epoch in range(100): for g, labels in dataloader: # your training code here pass
节点分类通常在单图上进行, 因此数据集的划分是在图的节点集上进行;
官方文档建议使用节点掩码来指定数据集的划分;
详细节点分类问题可见本文Chapter 5第1节;
所有与节点分类相关的数据集:
本节以内置数据集CitationGraphDataset
为例:
from dgl.data import DGLBuiltinDataset from dgl.data.utils import _get_dgl_url, generate_mask_tensor class CitationGraphDataset(DGLBuiltinDataset): _urls = { 'cora_v2' : 'dataset/cora_v2.zip', 'citeseer' : 'dataset/citeseer.zip', 'pubmed' : 'dataset/pubmed.zip', } def __init__(self, name, raw_dir=None, force_reload=False, verbose=True): assert name.lower() in ['cora', 'citeseer', 'pubmed'] if name.lower() == 'cora': name = 'cora_v2' url = _get_dgl_url(self._urls[name]) super(CitationGraphDataset, self).__init__(name, url=url, raw_dir=raw_dir, force_reload=force_reload, verbose=verbose) def process(self): # Skip some processing code # === data processing skipped === # build graph g = dgl.graph(graph) # splitting masks g.ndata['train_mask'] = generate_mask_tensor(train_mask) g.ndata['val_mask'] = generate_mask_tensor(val_mask) g.ndata['test_mask'] = generate_mask_tensor(test_mask) # node labels g.ndata['label'] = torch.tensor(labels) # node features g.ndata['feat'] = torch.tensor(_preprocess_features(features), dtype=F.data_type_dict['float32']) self._num_labels = onehot_labels.shape[1] self._labels = labels self._g = g def __getitem__(self, idx): assert idx == 0, "This dataset has only one graph" return self._g def __len__(self): return 1
process
函数中省略了部分代码, 留下的部分是突出关键部分: 划分掩码; 详细可参考CitationGraphDataset
源码 ;使用dgl.data.CitationGraphDataset
的子类dgl.data.CiteseerGraphDataset
来调用节点分类数据集:
# load data
dataset = CiteseerGraphDataset(raw_dir='')
graph = dataset[0]
# get split masks
train_mask = graph.ndata['train_mask']
val_mask = graph.ndata['val_mask']
test_mask = graph.ndata['test_mask']
# get node features
feats = graph.ndata['feat']
# get labels
labels = graph.ndata['label']
链接预测数据集的处理与节点分类相似, 数据集中通常只有一个图;
关于链接预测的详细内容可见本文Chapter 5第3节内容:
本节以内置数据集KnowledgeGraphDataset
为例:
# Example for creating Link Prediction datasets class KnowledgeGraphDataset(DGLBuiltinDataset): def __init__(self, name, reverse=True, raw_dir=None, force_reload=False, verbose=True): self._name = name self.reverse = reverse url = _get_dgl_url('dataset/') + '{}.tgz'.format(name) super(KnowledgeGraphDataset, self).__init__(name, url=url, raw_dir=raw_dir, force_reload=force_reload, verbose=verbose) def process(self): # Skip some processing code # === data processing skipped === # splitting mask g.edata['train_mask'] = train_mask g.edata['val_mask'] = val_mask g.edata['test_mask'] = test_mask # edge type g.edata['etype'] = etype # node type g.ndata['ntype'] = ntype self._g = g def __getitem__(self, idx): assert idx == 0, "This dataset has only one graph" return self._g def __len__(self): return 1
edata
中存储了划分掩码; 详细源码可见https://docs.dgl.ai/en/0.5.x/_modules/dgl/data/knowledge_graph.html#KnowledgeGraphDataset ;使用KnowledgeGraphDataset
的子类dgl.data.FB15k237Dataset
来调用链接预测数据集:
from dgl.data import FB15k237Dataset
# load data
dataset = FB15k237Dataset()
graph = dataset[0]
# get training mask
train_mask = graph.edata['train_mask']
train_idx = torch.nonzero(train_mask).squeeze()
src, dst = graph.edges(train_idx)
# get edge types in training set
rel = graph.edata['etype'][train_idx]
save
和load
函数;dgl.save_graphs(filename, g_list, labels=None)
: 保存DGLGraph
对象; 这在1.5节异构图已经提过了;dgl.load_graphs(filename, idx_list=None)
: 从本地读取DGLGraph
对象; 这在1.5节异构图已经提过了;dgl.data.utils.save_info(path, info)
: 将数据集的有用信息(dict
类型)保存;dgl.data.utils.load_info(path)
: 读取信息;import os from dgl import save_graphs, load_graphs from dgl.data.utils import makedirs, save_info, load_info def save(self): # save graphs and labels graph_path = os.path.join(self.save_path, self.mode + '_dgl_graph.bin') save_graphs(graph_path, self.graphs, {'labels': self.labels}) # save other information in python dict info_path = os.path.join(self.save_path, self.mode + '_info.pkl') save_info(info_path, {'num_classes': self.num_classes}) def load(self): # load processed data from directory `self.save_path` graph_path = os.path.join(self.save_path, self.mode + '_dgl_graph.bin') self.graphs, label_dict = load_graphs(graph_path) self.labels = label_dict['labels'] info_path = os.path.join(self.save_path, self.mode + '_info.pkl') self.num_classes = load_info(info_path)['num_classes'] def has_cache(self): # check whether there are processed data in `self.save_path` graph_path = os.path.join(self.save_path, self.mode + '_dgl_graph.bin') info_path = os.path.join(self.save_path, self.mode + '_info.pkl') return os.path.exists(graph_path) and os.path.exists(info_path)
GDELTDataset
中, 处理过的数据很大, 此时在__getitem__(idx)
中处理每个数据实例是更高效的方法;ogb
库, 全称开源图基准(Open Graph Benchmark), 是一个图深度学习的基准数据集, 其中内置了用于下载和处理ogb
数据集转为dgl.data.DGLGraph
对象的接口函数; 简单pip
安装即可;# Load Graph Property Prediction datasets in OGB import dgl import torch from ogb.graphproppred import DglGraphPropPredDataset from torch.utils.data import DataLoader def _collate_fn(batch): # batch is a list of tuple (graph, label) graphs = [e[0] for e in batch] g = dgl.batch(graphs) labels = [e[1] for e in batch] labels = torch.stack(labels, 0) return g, labels # load dataset dataset = DglGraphPropPredDataset(name='ogbg-molhiv') split_idx = dataset.get_idx_split() # dataloader train_loader = DataLoader(dataset[split_idx["train"]], batch_size=32, shuffle=True, collate_fn=_collate_fn) valid_loader = DataLoader(dataset[split_idx["valid"]], batch_size=32, shuffle=False, collate_fn=_collate_fn) test_loader = DataLoader(dataset[split_idx["test"]], batch_size=32, shuffle=False, collate_fn=_collate_fn)
# Load Node Property Prediction datasets in OGB
from ogb.nodeproppred import DglNodePropPredDataset
dataset = DglNodePropPredDataset(name='ogbn-proteins')
split_idx = dataset.get_idx_split()
# there is only one graph in Node Property Prediction datasets
g, labels = dataset[0]
# get split labels
train_label = dataset.labels[split_idx['train']]
valid_label = dataset.labels[split_idx['valid']]
test_label = dataset.labels[split_idx['test']]
# Load Link Property Prediction datasets in OGB
from ogb.linkproppred import DglLinkPropPredDataset
dataset = DglLinkPropPredDataset(name='ogbl-ppa')
split_edge = dataset.get_edge_split()
graph = dataset[0]
print(split_edge['train'].keys())
print(split_edge['valid'].keys())
print(split_edge['test'].keys())
dgl.nn
模块;import dgl
dataset = dgl.data.CiteseerGraphDataset()
graph = dataset[0]
torch
;import numpy as np import torch n_users = 1000 n_items = 500 n_follows = 3000 n_clicks = 5000 n_dislikes = 500 n_hetero_features = 10 n_user_classes = 5 n_max_clicks = 10 follow_src = np.random.randint(0, n_users, n_follows) follow_dst = np.random.randint(0, n_users, n_follows) click_src = np.random.randint(0, n_users, n_clicks) click_dst = np.random.randint(0, n_items, n_clicks) dislike_src = np.random.randint(0, n_users, n_dislikes) dislike_dst = np.random.randint(0, n_items, n_dislikes) hetero_graph = dgl.heterograph({ ('user', 'follow', 'user'): (follow_src, follow_dst), ('user', 'followed-by', 'user'): (follow_dst, follow_src), ('user', 'click', 'item'): (click_src, click_dst), ('item', 'clicked-by', 'user'): (click_dst, click_src), ('user', 'dislike', 'item'): (dislike_src, dislike_dst), ('item', 'disliked-by', 'user'): (dislike_dst, dislike_src)}) hetero_graph.nodes['user'].data['feature'] = torch.randn(n_users, n_hetero_features) hetero_graph.nodes['item'].data['feature'] = torch.randn(n_items, n_hetero_features) hetero_graph.nodes['user'].data['label'] = torch.randint(0, n_user_classes, (n_users,)) hetero_graph.edges['click'].data['label'] = torch.randint(1, n_max_clicks, (n_clicks,)).float() # randomly generate training masks on user nodes and click edges hetero_graph.nodes['user'].data['train_mask'] = torch.zeros(n_users, dtype=torch.bool).bernoulli(0.6) hetero_graph.edges['click'].data['train_mask'] = torch.zeros(n_clicks, dtype=torch.bool).bernoulli(0.6)
hetero_graph
中包含如下的边:
('user', 'follow', 'user')
;('user', 'followed-by', 'user')
;('user', 'click', 'item')
;('item', 'clicked-by', 'user')
;('user', 'dislike', 'item')
;('item', 'disliked-by', 'user')
;这是目前图神经网络中最为热门的研究之一; 给定一张图, 请给出所有节点的分类标签; 为了对节点进行分类, 图神经网络需要进行消息传递来利用每个节点自身的特征, 以及它近邻节点和边的特征;
GitHub@RE-Net 的paper中提到的聚合是包括k
级近邻内的聚合, 消息传递未必只是一级的, 可以是多级的, 只是每增加一级会大大增加消息传递的复杂度;
当然可以通过增加消息传递的轮数, 从而实现每个节点和边的信息可以尽可能传递到图中的每一个角落;
dgl
库提供了一些内置的图卷积模块来实现一轮的消息传递;dgl.nn.pytorch.SAGEConv
类为例: SAGE
类中包含了两个卷积层, 将多个卷积层叠加, 即可实现多轮的消息传递;# Contruct a two-layer GNN model import dgl.nn as dglnn import torch.nn as nn import torch.nn.functional as F class SAGE(nn.Module): def __init__(self, in_feats, hid_feats, out_feats): super().__init__() self.conv1 = dglnn.SAGEConv( in_feats=in_feats, out_feats=hid_feats, aggregator_type='mean') self.conv2 = dglnn.SAGEConv( in_feats=hid_feats, out_feats=out_feats, aggregator_type='mean') def forward(self, graph, inputs): # inputs are features of nodes h = self.conv1(graph, inputs) h = F.relu(h) h = self.conv2(graph, h) return h
torch
的模型训练并无区别;
forward
;optimizer.zero_grad()
;loss.backward()
;optimizer.step()
;dgl.data.CiteseerGraphDataset
为例, 介绍训练流程;graph
, 详细略, 可见本文第四章关于数据集加载的方法;node_features = graph.ndata['feat']
node_labels = graph.ndata['label']
train_mask = graph.ndata['train_mask']
valid_mask = graph.ndata['val_mask']
test_mask = graph.ndata['test_mask']
n_features = node_features.shape[1]
n_labels = int(node_labels.max().item() + 1)
def evaluate(model, graph, features, labels, mask):
model.eval()
with torch.no_grad():
logits = model(graph, features)
logits = logits[mask]
labels = labels[mask]
_, indices = torch.max(logits, dim=1)
correct = torch.sum(indices == labels)
return correct.item() * 1.0 / len(labels)
model = SAGE(in_feats=n_features, hid_feats=100, out_feats=n_labels) opt = torch.optim.Adam(model.parameters()) for epoch in range(10): model.train() # forward propagation by using all nodes logits = model(graph, node_features) # compute loss loss = F.cross_entropy(logits[train_mask], node_labels[train_mask]) # compute validation accuracy acc = evaluate(model, graph, node_features, node_labels, valid_mask) # backward propagation opt.zero_grad() loss.backward() opt.step() print(loss.item()) # Save model if necessary. Omitted in this example.
dgl.nn.pytorch.HeteroGraphConv
来实现;self.conv1
), 然后将每种类型的边的消息聚合结果累和作为所有节点类型的最终结果(self.conv2
):# Define a Heterograph Conv model import dgl.nn as dglnn class RGCN(nn.Module): def __init__(self, in_feats, hid_feats, out_feats, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(in_feats, hid_feats) for rel in rel_names}, aggregate='sum') self.conv2 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(hid_feats, out_feats) for rel in rel_names}, aggregate='sum') def forward(self, graph, inputs): # inputs are features of nodes h = self.conv1(graph, inputs) h = {k: F.relu(v) for k, v in h.items()} h = self.conv2(graph, h) return h model = RGCN(n_hetero_features, 20, n_user_classes, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] labels = hetero_graph.nodes['user'].data['label'] train_mask = hetero_graph.nodes['user'].data['train_mask'] node_features = {'user': user_feats, 'item': item_feats} h_dict = model(hetero_graph, {'user': user_feats, 'item': item_feats}) h_user = h_dict['user'] h_item = h_dict['item'] opt = torch.optim.Adam(model.parameters()) for epoch in range(5): model.train() # forward propagation by using all nodes and extracting the user embeddings logits = model(hetero_graph, node_features)['user'] # compute loss loss = F.cross_entropy(logits[train_mask], labels[train_mask]) # Compute validation accuracy. Omitted in this example. # backward propagation opt.zero_grad() loss.backward() opt.step() print(loss.item()) # Save model if necessary. Omitted in the example.
RelGraphConvLayer
类的实现代码可见dgl@GitHub ;常见的边分类问题就是知识图谱中的关系预测; 本质上边分类/回归于节点分类/回归是大致相似的, 因为边的预测可以从邻近节点的特征表示通过某种聚合得到; 但是仍然有一些的不同;
import dgl
import numpy as np
src = np.random.randint(0, 100, 500)
dst = np.random.randint(0, 100, 500)
# make it symmetric
edge_pred_graph = dgl.graph((np.concatenate([src, dst]), np.concatenate([dst, src])))
# synthetic node and edge features, as well as edge labels
edge_pred_graph.ndata['feature'] = torch.randn(100, 10)
edge_pred_graph.edata['feature'] = torch.randn(1000, 10)
edge_pred_graph.edata['label'] = torch.randn(1000)
# synthetic train-validation-test splits
edge_pred_graph.edata['train_mask'] = torch.zeros(1000, dtype=torch.bool).bernoulli(0.6)
apply_edges()
方法来计算边的预测值即可; 简单的一个例子即直接将边的两个端点的特征表示点乘得到边的预测特征:import dgl.function as fn
class DotProductPredictor(nn.Module):
def forward(self, graph, h):
# h contains the node representations computed from the GNN defined
# in the node classification section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
return graph.edata['score']
class MLPPredictor(nn.Module): def __init__(self, in_features, out_classes): super().__init__() self.W = nn.Linear(in_features * 2, out_classes) def apply_edges(self, edges): h_u = edges.src['h'] h_v = edges.dst['h'] score = self.W(torch.cat([h_u, h_v], 1)) return {'score': score} def forward(self, graph, h): # h contains the node representations computed from the GNN defined # in the node classification section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h graph.apply_edges(self.apply_edges) return graph.edata['score']
SAGE
模型作为节点表示计算模型, 并使用DotPredictor
作为边预测模型:class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.sage = SAGE(in_features, hidden_features, out_features) self.pred = DotProductPredictor() def forward(self, g, x): h = self.sage(g, x) return self.pred(g, h) node_features = edge_pred_graph.ndata['feature'] edge_label = edge_pred_graph.edata['label'] train_mask = edge_pred_graph.edata['train_mask'] model = Model(10, 20, 5) opt = torch.optim.Adam(model.parameters()) for epoch in range(10): pred = model(edge_pred_graph, node_features) loss = ((pred[train_mask] - edge_label[train_mask]) ** 2).mean() opt.zero_grad() loss.backward() opt.step() print(loss.item())
apply_edges
方法中额外指定边类型即可;DotPredictor
的例子, 将它转为异构图的情况:from dgl import function as fn
class HeteroDotProductPredictor(nn.Module):
def forward(self, graph, h, etype):
# h contains the node representations for each edge type computed from
# the GNN for heterogeneous graphs defined in the node classification
# section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h # assigns 'h' of all node types in one shot
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype)
return graph.edges[etype].data['score']
HeteroMLPPredictor
:class MLPPredictor(nn.Module): def __init__(self, in_features, out_classes): super().__init__() self.W = nn.Linear(in_features * 2, out_classes) def apply_edges(self, edges): h_u = edges.src['h'] h_v = edges.dst['h'] score = self.W(torch.cat([h_u, h_v], 1)) return {'score': score} def forward(self, graph, h, etype): # h contains the node representations for each edge type computed from # the GNN for heterogeneous graphs defined in the node classification # section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h # assigns 'h' of all node types in one shot graph.apply_edges(self.apply_edges, etype=etype) return graph.edges[etype].data['score']
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features, rel_names):
super().__init__()
self.sage = RGCN(in_features, hidden_features, out_features, rel_names)
self.pred = HeteroDotProductPredictor()
def forward(self, g, x, etype):
h = self.sage(g, x)
return self.pred(g, h, etype)
model = Model(10, 20, 5, hetero_graph.etypes)
user_feats = hetero_graph.nodes['user'].data['feature']
item_feats = hetero_graph.nodes['item'].data['feature']
label = hetero_graph.edges['click'].data['label']
train_mask = hetero_graph.edges['click'].data['train_mask']
node_features = {'user': user_feats, 'item': item_feats}
opt = torch.optim.Adam(model.parameters())
for epoch in range(10):
pred = model(hetero_graph, node_features, 'click')
loss = ((pred[train_mask] - label[train_mask]) ** 2).mean()
opt.zero_grad()
loss.backward()
opt.step()
print(loss.item())
RGCN
类;HeteroDotProductPredictor
;dec_graph = hetero_graph['user', :, 'item']
edge_label = dec_graph.edata[dgl.ETYPE]
user
和item
)的异构图;dgl.ETYPE
的里面取;class HeteroMLPPredictor(nn.Module): def __init__(self, in_dims, n_classes): super().__init__() self.W = nn.Linear(in_dims * 2, n_classes) def apply_edges(self, edges): x = torch.cat([edges.src['h'], edges.dst['h']], 1) y = self.W(x) return {'score': y} def forward(self, graph, h): # h contains the node representations for each edge type computed from # the GNN for heterogeneous graphs defined in the node classification # section (Section 5.1). with graph.local_scope(): graph.ndata['h'] = h # assigns 'h' of all node types in one shot graph.apply_edges(self.apply_edges) return graph.edata['score']
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, rel_names): super().__init__() self.sage = RGCN(in_features, hidden_features, out_features, rel_names) self.pred = HeteroMLPPredictor(out_features, len(rel_names)) def forward(self, g, x, dec_graph): h = self.sage(g, x) return self.pred(dec_graph, h) model = Model(10, 20, 5, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] node_features = {'user': user_feats, 'item': item_feats} opt = torch.optim.Adam(model.parameters()) for epoch in range(10): logits = model(hetero_graph, node_features, dec_graph) loss = F.cross_entropy(logits, edge_label) opt.zero_grad() loss.backward() opt.step() print(loss.item())
官方示例@GitHub ;
这一部分都省略了读取数据的部分, 学到这里可能已经忘了上面是怎么读取数据的了, 因为大部分dgl
内置的数据集都需要从外网下载, 速度比较慢, 所以选一两个典型的做个测试即可, 以后项目里的数据集还是需要自己处理生成的;
本章大部分异构图数据集的例子都是取自本章开头的那段代码;
class DotProductPredictor(nn.Module):
def forward(self, graph, h):
# h contains the node representations computed from the GNN defined
# in the node classification section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
return graph.edata['score']
def construct_negative_graph(graph, k):
src, dst = graph.edges()
neg_src = src.repeat_interleave(k)
neg_dst = torch.randint(0, graph.number_of_nodes(), (len(src) * k,))
return dgl.graph((neg_src, neg_dst), num_nodes=graph.number_of_nodes())
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.sage = SAGE(in_features, hidden_features, out_features) self.pred = DotProductPredictor() def forward(self, g, neg_g, x): h = self.sage(g, x) return self.pred(g, h), self.pred(neg_g, h) def compute_loss(pos_score, neg_score): # Margin loss n_edges = pos_score.shape[0] return (1 - neg_score.view(n_edges, -1) + pos_score.unsqueeze(1)).clamp(min=0).mean() node_features = graph.ndata['feat'] n_features = node_features.shape[1] k = 5 model = Model(n_features, 100, 100) opt = torch.optim.Adam(model.parameters()) for epoch in range(10): negative_graph = construct_negative_graph(graph, k) pos_score, neg_score = model(graph, negative_graph, node_features) loss = compute_loss(pos_score, neg_score) opt.zero_grad() loss.backward() opt.step() print(loss.item())
graph.ndata['feat']
的部分, 简单使用内置的数据集即可;HeteroDotProductPredictor
的例子:class HeteroDotProductPredictor(nn.Module):
def forward(self, graph, h, etype):
# h contains the node representations for each node type computed from
# the GNN defined in the previous section (Section 5.1).
with graph.local_scope():
graph.ndata['h'] = h
graph.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype)
return graph.edges[etype].data['score']
def construct_negative_graph(graph, k, etype):
utype, _, vtype = etype
src, dst = graph.edges(etype=etype)
neg_src = src.repeat_interleave(k)
neg_dst = torch.randint(0, graph.number_of_nodes(vtype), (len(src) * k,))
return dgl.heterograph(
{etype: (neg_src, neg_dst)},
num_nodes_dict={ntype: graph.number_of_nodes(ntype) for ntype in graph.ntypes})
class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, rel_names): super().__init__() self.sage = RGCN(in_features, hidden_features, out_features, rel_names) self.pred = HeteroDotProductPredictor() def forward(self, g, neg_g, x, etype): h = self.sage(g, x) return self.pred(g, h, etype), self.pred(neg_g, h, etype) def compute_loss(pos_score, neg_score): # Margin loss n_edges = pos_score.shape[0] return (1 - neg_score.view(n_edges, -1) + pos_score.unsqueeze(1)).clamp(min=0).mean() k = 5 model = Model(10, 20, 5, hetero_graph.etypes) user_feats = hetero_graph.nodes['user'].data['feature'] item_feats = hetero_graph.nodes['item'].data['feature'] node_features = {'user': user_feats, 'item': item_feats} opt = torch.optim.Adam(model.parameters()) for epoch in range(10): negative_graph = construct_negative_graph(hetero_graph, k, ('user', 'click', 'item')) pos_score, neg_score = model(hetero_graph, negative_graph, node_features, ('user', 'click', 'item')) loss = compute_loss(pos_score, neg_score) opt.zero_grad() loss.backward() opt.step() print(loss.item())
有时需要在多图上做分类问题, 如将人分为不同的群体, 通过定义不同群体中的人际关系, 可以得到很多张图来用于分类;
dgl
库中的dgl.batch
函数本质上是把一批图直接当成一个大图来处理, 形象地可以用下面地图来表示: dgl
库提供了一系列内置的readout操作, 如dgl.readout_nodes()
;import dgl import torch g1 = dgl.graph(([0, 1], [1, 0])) g1.ndata['h'] = torch.tensor([1., 2.]) g2 = dgl.graph(([0, 1], [1, 2])) g2.ndata['h'] = torch.tensor([1., 2., 3.]) print(dgl.readout_nodes(g1, 'h')) # tensor([3.]) # 1 + 2 bg = dgl.batch([g1, g2]) print(dgl.readout_nodes(bg, 'h')) # tensor([3., 6.]) # [1 + 2, 1 + 2 + 3] print(bg.ndata['h']) # tensor([1., 2., 1., 2., 3.])
import dgl.nn.pytorch as dglnn import torch.nn as nn class Classifier(nn.Module): def __init__(self, in_dim, hidden_dim, n_classes): super(Classifier, self).__init__() self.conv1 = dglnn.GraphConv(in_dim, hidden_dim) self.conv2 = dglnn.GraphConv(hidden_dim, hidden_dim) self.classify = nn.Linear(hidden_dim, n_classes) def forward(self, g, h): # Apply graph convolution and activation. h = F.relu(self.conv1(g, h)) h = F.relu(self.conv2(g, h)) with g.local_scope(): g.ndata['h'] = h # Calculate graph representation by average readout. hg = dgl.mean_nodes(g, 'h') return self.classify(hg)
import dgl.data dataset = dgl.data.GINDataset('MUTAG', False) def collate(samples): graphs, labels = map(list, zip(*samples)) batched_graph = dgl.batch(graphs) batched_labels = torch.tensor(labels) return batched_graph, batched_labels from torch.utils.data import DataLoader dataloader = DataLoader( dataset, batch_size=1024, collate_fn=collate, drop_last=False, shuffle=True)
import torch.nn.functional as F
# Only an example, 7 is the input feature size
model = Classifier(7, 20, 5)
opt = torch.optim.Adam(model.parameters())
for epoch in range(20):
for batched_graph, labels in dataloader:
feats = batched_graph.ndata['attr'].float()
logits = model(batched_graph, feats)
loss = F.cross_entropy(logits, labels)
opt.zero_grad()
loss.backward()
opt.step()
RGCN
的代码作为示例:class RGCN(nn.Module): def __init__(self, in_feats, hid_feats, out_feats, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(in_feats, hid_feats) for rel in rel_names}, aggregate='sum') self.conv2 = dglnn.HeteroGraphConv({ rel: dglnn.GraphConv(hid_feats, out_feats) for rel in rel_names}, aggregate='sum') def forward(self, graph, inputs): # inputs is features of nodes h = self.conv1(graph, inputs) h = {k: F.relu(v) for k, v in h.items()} h = self.conv2(graph, h) return h class HeteroClassifier(nn.Module): def __init__(self, in_dim, hidden_dim, n_classes, rel_names): super().__init__() self.rgcn = RGCN(in_dim, hidden_dim, hidden_dim, rel_names) self.classify = nn.Linear(hidden_dim, n_classes) def forward(self, g): h = g.ndata['feat'] h = self.rgcn(g, h) with g.local_scope(): g.ndata['h'] = h # Calculate graph representation by average readout. hg = 0 for ntype in g.ntypes: hg = hg + dgl.mean_nodes(g, 'h', ntype=ntype) return self.classify(hg) # etypes is the list of edge types as strings. model = HeteroClassifier(10, 20, 5, etypes) opt = torch.optim.Adam(model.parameters()) for epoch in range(20): for batched_graph, labels in dataloader: logits = model(batched_graph) loss = F.cross_entropy(logits, labels) opt.zero_grad() loss.backward() opt.step()
因此本章主要介绍执行随机小批量训练的方法, 这样就不需要把整张图里的节点特征都输入到GPU中;
近邻采样方法(Neighborhood Sampling Approaches):
dgl.sampling
: https://docs.dgl.ai/api/python/dgl.sampling.html ;
dgl.sampling
模块中提供了一些近邻采样方法;为了实现随机训练, 需要进行三步走:
① 定义一个近邻采样器;
② 定义一个能够进行Mini-batch训练的模型;
③ 调整模型训练循环中的逻辑;
dgl
库中定义了几个内置的近邻采样器类, 以MultiLayerFullNeighborSampler
为例, 该采样器可以使节点聚合所有近邻的消息;dgl
库的采样器时也必须和NodeDataLoader
结合使用, 该类是用于迭代minibatch上的节点集合;train_nids
上进行迭代, 并将生成的区块列表加载到GPU上的dataloader;import dgl
import dgl.nn as dglnn
import torch
import torch.nn as nn
import torch.nn.functional as F
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.NodeDataLoader(
g, train_nids, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
input_nodes, output_nodes, blocks = next(iter(dataloader))
print(blocks)
class dgl.dataloading.neighbor.MultiLayerFullNeighborSampler(n_layers, return_eids=False)
;class dgl.dataloading.pytorch.NodeDataLoader(g, nids, block_sampler, **kwargs)
nids
就是用于计算输出节点的输入节点, 就是在最后一层选定这些nids
, 那么在输入的实际采样应该是哪些;input_nodes
是需要用来计算output_nodes
的表示的节点;output_nodes
就是输出blocks
描述了每个GNN层中, 哪些节点表示是被计算为输出, 哪些节点表示是被需要当作输入, 以及输入节点的表示是如何传播到输出节点的;关于这个代码似乎很难跑通, 笔者用的是下面这个karate club problem的图, 但是还是跑不通, 设了很多不同的train_ids, 但是还是一直报很长的
Runtime Error
错误, 报错的最后一行是:RuntimeError: DataLoader worker (pid(s) 11136, 10940, 4672, 1352) exited unexpectedly
, 代码示例如下, 之后再来找问题了, 暂时还是搞不通, 不卡在这里了;
def build_karate_club_graph(): # All 78 edges are stored in two numpy arrays. One for source endpoints # while the other for destination endpoints. src = np.array([1, 2, 2, 3, 3, 3, 4, 5, 6, 6, 6, 7, 7, 7, 7, 8, 8, 9, 10, 10, 10, 11, 12, 12, 13, 13, 13, 13, 16, 16, 17, 17, 19, 19, 21, 21, 25, 25, 27, 27, 27, 28, 29, 29, 30, 30, 31, 31, 31, 31, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33, 33]) dst = np.array([0, 0, 1, 0, 1, 2, 0, 0, 0, 4, 5, 0, 1, 2, 3, 0, 2, 2, 0, 4, 5, 0, 0, 3, 0, 1, 2, 3, 5, 6, 0, 1, 0, 1, 0, 1, 23, 24, 2, 23, 24, 2, 23, 26, 1, 8, 0, 24, 25, 28, 2, 8, 14, 15, 18, 20, 22, 23, 29, 30, 31, 8, 9, 13, 14, 15, 18, 19, 20, 22, 23, 26, 27, 28, 29, 30, 31, 32]) # Edges are directional in DGL; Make them bi-directional. u = np.concatenate([src, dst]) v = np.concatenate([dst, src]) # Construct a DGLGraph return dgl.DGLGraph((u, v)) g = build_karate_club_graph() train_nids = torch.tensor([0])
dgl
库内置的接口函数, 则将模型调整为适应Mini-batch是非常简单的, 以二层GCN为例:class TwoLayerGCN(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.conv1 = dglnn.GraphConv(in_features, hidden_features)
self.conv2 = dglnn.GraphConv(hidden_features, out_features)
def forward(self, g, x):
x = F.relu(self.conv1(g, x))
x = F.relu(self.conv2(g, x))
return x
g
替换为上一点中生成的blocks
即可;class StochasticTwoLayerGCN(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.conv1 = dgl.nn.GraphConv(in_features, hidden_features)
self.conv2 = dgl.nn.GraphConv(hidden_features, out_features)
def forward(self, blocks, x):
x = F.relu(self.conv1(blocks[0], x))
x = F.relu(self.conv2(blocks[1], x))
return x
dgl.nn
中各个模块的说明 , 可以知道每个模块是否接受blocks
作为参数, 并不是所有的都可以这样改的, 但是大部分都可以, 比如GraphConv
模块;model = StochasticTwoLayerGCN(in_features, hidden_features, out_features)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, output_nodes, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
input_features = blocks[0].srcdata['features']
output_labels = blocks[-1].dstdata['label']
output_predictions = model(blocks, input_features)
loss = compute_loss(output_labels, output_predictions)
opt.zero_grad()
loss.backward()
opt.step()
g.ndata
中, 则可以通过blocks[0].srcdata
获得, 即输入节点的特征是在第一个block里;g.ndata
中, 则可以通过blocks[-1].srcdata
获得, 即输入节点的特征是在最后一个block里;class StochasticTwoLayerRGCN(nn.Module): def __init__(self, in_feat, hidden_feat, out_feat, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right') for rel in rel_names }) self.conv2 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right') for rel in rel_names }) def forward(self, blocks, x): x = self.conv1(blocks[0], x) x = self.conv2(blocks[1], x) return x
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.NodeDataLoader(
g, train_nid_dict, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
model = StochasticTwoLayerRGCN(in_features, hidden_features, out_features, etypes)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, output_nodes, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
input_features = blocks[0].srcdata # returns a dict
output_labels = blocks[-1].dstdata # returns a dict
output_predictions = model(blocks, input_features)
loss = compute_loss(output_labels, output_predictions)
opt.zero_grad()
loss.backward()
opt.step()
边分类基本与节点分类类似;
train_nids
替换成train_eids
, 其余细节不再赘述, 详细可见上一节对应部分:sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_eid_dict, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
class dgl.dataloading.pytorch.EdgeDataLoader(g, eids, block_sampler, **kwargs)
: DGL官方文档 ;When training edge classification models, sometimes you wish to remove the edges appearing in the training data from the computation dependency as if they never existed. Otherwise, the model will ‘know’ the fact that an edge exists between the two nodes, and potentially use it for advantage.
EdgeDataLoader
地构造参数exclude='reverse_id'
来实现这种效果:n_edges = g.number_of_edges()
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_eid_dict, sampler,
# The following two arguments are specifically for excluding the minibatch
# edges and their reverse edges from the original graph for neighborhood
# sampling.
exclude='reverse_id',
reverse_eids=torch.cat([
torch.arange(n_edges // 2, n_edges), torch.arange(0, n_edges // 2)]),
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
class StochasticTwoLayerGCN(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.conv1 = dglnn.GraphConv(in_features, hidden_features)
self.conv2 = dglnn.GraphConv(hidden_features, out_features)
def forward(self, blocks, x):
x = F.relu(self.conv1(blocks[0], x))
x = F.relu(self.conv2(blocks[1], x))
return x
class ScorePredictor(nn.Module):
def __init__(self, num_classes, in_features):
super().__init__()
self.W = nn.Linear(2 * in_features, num_classes)
def apply_edges(self, edges):
data = torch.cat([edges.src['x'], edges.dst['x']])
return {'score': self.W(data)}
def forward(self, edge_subgraph, x):
with edge_subgraph.local_scope():
edge_subgraph.ndata['x'] = x
edge_subgraph.apply_edges(self.apply_edges)
return edge_subgraph.edata['score']
dgl.DGLHeteroGraph.apply_edges()
可以计算出子图上边的得分;class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features, num_classes):
super().__init__()
self.gcn = StochasticTwoLayerGCN(
in_features, hidden_features, out_features)
self.predictor = ScorePredictor(num_classes, out_features)
def forward(self, edge_subgraph, blocks, x):
x = self.gcn(blocks, x)
return self.predictor(edge_subgraph, x)
dgl
确保边子图中的节点与最后一个block的输出节点是相同的;model = Model(in_features, hidden_features, out_features, num_classes)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, edge_subgraph, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
edge_subgraph = edge_subgraph.to(torch.device('cuda'))
input_features = blocks[0].srcdata['features']
edge_labels = edge_subgraph.edata['labels']
edge_predictions = model(edge_subgraph, blocks, input_features)
loss = compute_loss(edge_labels, edge_predictions)
opt.zero_grad()
loss.backward()
opt.step()
class StochasticTwoLayerRGCN(nn.Module): def __init__(self, in_feat, hidden_feat, out_feat, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right') for rel in rel_names }) self.conv2 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right') for rel in rel_names }) def forward(self, blocks, x): x = self.conv1(blocks[0], x) x = self.conv2(blocks[1], x) return x
apply_edges()
调用:class ScorePredictor(nn.Module): def __init__(self, num_classes, in_features): super().__init__() self.W = nn.Linear(2 * in_features, num_classes) def apply_edges(self, edges): data = torch.cat([edges.src['x'], edges.dst['x']]) return {'score': self.W(data)} def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x for etype in edge_subgraph.canonical_etypes: edge_subgraph.apply_edges(self.apply_edges, etype=etype) return edge_subgraph.edata['score'] class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, num_classes, etypes): super().__init__() self.rgcn = StochasticTwoLayerRGCN( in_features, hidden_features, out_features, etypes) self.pred = ScorePredictor(num_classes, out_features) def forward(self, edge_subgraph, blocks, x): x = self.rgcn(blocks, x) return self.pred(edge_subgraph, x)
NodeDataLoader
替换成EdgeDataLoader
:sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_eid_dict, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_eid_dict, sampler,
# The following two arguments are specifically for excluding the minibatch
# edges and their reverse edges from the original graph for neighborhood
# sampling.
exclude='reverse_types',
reverse_etypes={'follow': 'followed by', 'followed by': 'follow',
'purchase': 'purchased by', 'purchased by': 'purchase'}
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
model = Model(in_features, hidden_features, out_features, num_classes, etypes)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, edge_subgraph, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
edge_subgraph = edge_subgraph.to(torch.device('cuda'))
input_features = blocks[0].srcdata['features']
edge_labels = edge_subgraph.edata['labels']
edge_predictions = model(edge_subgraph, blocks, input_features)
loss = compute_loss(edge_labels, edge_predictions)
opt.zero_grad()
loss.backward()
opt.step()
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_seeds, sampler,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5),
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=args.num_workers)
train_nids
或train.eids
, 而是train_seeds
;dgl.dataloading.negative_sampler.Uniform
可以用于均匀采样;class NegativeSampler(object): def __init__(self, g, k): # caches the probability distribution self.weights = g.in_degrees().float() ** 0.75 self.k = k def __call__(self, g, eids): src, _ = g.find_edges(eids) src = src.repeat_interleave(self.k) dst = self.weights.multinomial(len(src), replacement=True) return src, dst dataloader = dgl.dataloading.EdgeDataLoader( g, train_seeds, sampler, negative_sampler=NegativeSampler(g, 5), batch_size=args.batch_size, shuffle=True, drop_last=False, pin_memory=True, num_workers=args.num_workers)
class StochasticTwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.conv1 = dgl.nn.GraphConv(in_features, hidden_features) self.conv2 = dgl.nn.GraphConv(hidden_features, out_features) def forward(self, blocks, x): x = F.relu(self.conv1(blocks[0], x)) x = F.relu(self.conv2(blocks[1], x)) return x class ScorePredictor(nn.Module): def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x edge_subgraph.apply_edges(dgl.function.u_dot_v('x', 'x', 'score')) return edge_subgraph.edata['score']
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.gcn = StochasticTwoLayerGCN(
in_features, hidden_features, out_features)
def forward(self, positive_graph, negative_graph, blocks, x):
x = self.gcn(blocks, x)
pos_score = self.predictor(positive_graph, x)
neg_score = self.predictor(negative_graph, x)
return pos_score, neg_score
model = Model(in_features, hidden_features, out_features)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, positive_graph, negative_graph, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
positive_graph = positive_graph.to(torch.device('cuda'))
negative_graph = negative_graph.to(torch.device('cuda'))
input_features = blocks[0].srcdata['features']
pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
loss.backward()
opt.step()
class StochasticTwoLayerRGCN(nn.Module): def __init__(self, in_feat, hidden_feat, out_feat, rel_names): super().__init__() self.conv1 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right') for rel in rel_names }) self.conv2 = dglnn.HeteroGraphConv({ rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right') for rel in rel_names }) def forward(self, blocks, x): x = self.conv1(blocks[0], x) x = self.conv2(blocks[1], x) return x
dgl.DGLHeteroGraph.apply_edges()
;class ScorePredictor(nn.Module): def forward(self, edge_subgraph, x): with edge_subgraph.local_scope(): edge_subgraph.ndata['x'] = x for etype in edge_subgraph.canonical_etypes: edge_subgraph.apply_edges( dgl.function.u_dot_v('x', 'x', 'score'), etype=etype) return edge_subgraph.edata['score'] class Model(nn.Module): def __init__(self, in_features, hidden_features, out_features, num_classes, etypes): super().__init__() self.rgcn = StochasticTwoLayerRGCN( in_features, hidden_features, out_features, etypes) self.pred = ScorePredictor() def forward(self, positive_graph, negative_graph, blocks, x): x = self.rgcn(blocks, x) pos_score = self.pred(positive_graph, x) neg_score = self.pred(negative_graph, x) return pos_score, neg_score
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_eid_dict, sampler,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5),
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
class NegativeSampler(object): def __init__(self, g, k): # caches the probability distribution self.weights = { etype: g.in_degrees(etype=etype).float() ** 0.75 for etype in g.canonical_etypes} self.k = k def __call__(self, g, eids_dict): result_dict = {} for etype, eids in eids_dict.items(): src, _ = g.find_edges(eids, etype=etype) src = src.repeat_interleave(self.k) dst = self.weights.multinomial(len(src), replacement=True) result_dict[etype] = (src, dst) return result_dict dataloader = dgl.dataloading.EdgeDataLoader( g, train_eid_dict, sampler, negative_sampler=NegativeSampler(g, 5), batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
model = Model(in_features, hidden_features, out_features, num_classes, etypes)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, positive_graph, negative_graph, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
positive_graph = positive_graph.to(torch.device('cuda'))
negative_graph = negative_graph.to(torch.device('cuda'))
input_features = blocks[0].srcdata['features']
pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
loss.backward()
opt.step()
进阶篇章, 通常不建议自定义近邻采样器, 除非做相关研究;
x
和y
;import torch
import dgl
src = torch.LongTensor(
[0, 0, 0, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 7, 7, 8, 9, 10,
1, 2, 3, 3, 3, 4, 5, 5, 6, 5, 8, 6, 8, 9, 8, 11, 11, 10, 11])
dst = torch.LongTensor(
[1, 2, 3, 3, 3, 4, 5, 5, 6, 5, 8, 6, 8, 9, 8, 11, 11, 10, 11,
0, 0, 0, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 7, 7, 8, 9, 10])
g = dgl.graph((src, dst))
g.ndata['x'] = torch.randn(12, 5)
g.ndata['y'] = torch.randn(12, 1)
dgl
库内置了一些函数来生成frontier, 如dgl.in_subgraph()
, 可以用于推断出一个带有原始图中所有节点的子图, 但只保留必要的一些边:frontier = dgl.in_subgraph(g, [8])
print(frontier.all_edges())
dgl.to_block(g, dst_nodes=None, include_dst_in_src=True)
可以将任何frontier转为一个block, 参数g
即为子图frontier;output_nodes = torch.LongTensor([8])
block = dgl.to_block(frontier, output_nodes)
dgl.DGLHeteroGraph.number_of_src_nodes()
和dgl.DGLHeteroGraph.number_of_dst_nodes()
来确定输入节点和输出节点的数量:num_input_nodes, num_output_nodes = block.number_of_src_nodes(), block.number_of_dst_nodes()
print(num_input_nodes, num_output_nodes)
dgl.DGLHeteroGraph.srcdata
, dgl.DGLHeteroGraph.srcnodes
可以获取block的输入节点特征, 输出节点特征则可以通过dgl.DGLHeteroGraph.dstdata
, dgl.DGLHeteroGraph.dstnodes
得到; srcdata/dstdata
, srcnodes/dstnodes
本身与dgl.DGLHeteroGraph.ndata
与dgl.DGLHeteroGraph.ndata
是相同的; 并且可以通过srcdata/dstdata
来获取block的输入节点和输出节点;block.srcdata['h'] = torch.randn(num_input_nodes, 5)
block.dstdata['h'] = torch.randn(num_output_nodes, 5)
print(block.srcdata['x'])
print(block.dstdata['y'])
dgl.NID
和dgl.EID
可以获取输入输出节点的特征:input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
assert torch.equal(input_nodes[:len(output_nodes)], output_nodes)
dgl
库确保一个block中的输出节点总是出现在输入节点中, 且永远是在输入节点的最一开始的位置, 正如这段代码所断言的那样;dgl.to_block(frontier2, torch.LongTensor([4, 5])) # ERROR
# Node 3 is an isolated node that do not have any edge pointing to it.
block3 = dgl.to_block(frontier2, torch.LongTensor([4, 5, 7, 8, 11, 3]))
print(block3.srcdata[dgl.NID])
print(block3.dstdata[dgl.NID])
hetero_frontier = dgl.heterograph({
('user', 'follow', 'user'): ([1, 3, 7], [3, 6, 8]),
('user', 'play', 'game'): ([5, 5, 4], [6, 6, 2]),
('game', 'played-by', 'user'): ([2], [6])
}, num_nodes_dict={'user': 10, 'game': 10})
hetero_block = dgl.to_block(hetero_frontier, {'user': [3, 6, 8], 'block': [2, 6]})
# input users and games
print(hetero_block.srcnodes['user'].data[dgl.NID], hetero_block.srcnodes['game'].data[dgl.NID])
# output users and games
print(hetero_block.dstnodes['user'].data[dgl.NID], hetero_block.dstnodes['game'].data[dgl.NID])
MultiLayerFullNeighborSampler
为例, 它的父类是BlockSampler
;
BlockSampler
是用于从最后义层生成blocks列表的类, 使用了sample_blocks()
方法, 默认的方法实现就是进行反向迭代, 生成frontier再把它们转为blocks;sample_frontier()
方法, 给定采样器再哪个层生成frontier, 以及原始图和节点用于计算特征表示;MultiLayerFullNeighborSampler
;class MultiLayerFullNeighborSampler(dgl.dataloading.BlockSampler):
def __init__(self, n_layers):
super().__init__(n_layers)
def sample_frontier(self, block_id, g, seed_nodes):
frontier = dgl.in_subgraph(g, seed_nodes)
return frontier
class MultiLayerNeighborSampler(dgl.dataloading.BlockSampler):
def __init__(self, fanouts):
super().__init__(len(fanouts))
self.fanouts = fanouts
def sample_frontier(self, block_id, g, seed_nodes):
fanout = self.fanouts[block_id]
if fanout is None:
frontier = dgl.in_subgraph(g, seed_nodes)
else:
frontier = dgl.sampling.sample_neighbors(g, seed_nodes, fanout)
return frontier
class MultiLayerDropoutSampler(dgl.dataloading.BlockSampler): def __init__(self, p, n_layers): super().__init__() self.n_layers = n_layers self.p = p def sample_frontier(self, block_id, g, seed_nodes, *args, **kwargs): # Get all inbound edges to `seed_nodes` src, dst = dgl.in_subgraph(g, seed_nodes).all_edges() # Randomly select edges with a probability of p mask = torch.zeros_like(src).bernoulli_(self.p) src = src[mask] dst = dst[mask] # Return a new graph with the same nodes as the original graph as a # frontier frontier = dgl.graph((src, dst), num_nodes=g.number_of_nodes()) return frontier def __len__(self): return self.n_layers
sampler = MultiLayerDropoutSampler(0.5, 2) dataloader = dgl.dataloading.NodeDataLoader( g, train_nids, sampler, batch_size=1024, shuffle=True, drop_last=False, num_workers=4) model = StochasticTwoLayerRGCN(in_features, hidden_features, out_features) model = model.cuda() opt = torch.optim.Adam(model.parameters()) for input_nodes, blocks in dataloader: blocks = [b.to(torch.device('cuda')) for b in blocks] input_features = blocks[0].srcdata # returns a dict output_labels = blocks[-1].dstdata # returns a dict output_predictions = model(blocks, input_features) loss = compute_loss(output_labels, output_predictions) opt.zero_grad() loss.backward() opt.step()
MultiLayerDropoutSampler
为例改写一个适用于异构图的MultiLayerDropoutSampler
:class MultiLayerDropoutSampler(dgl.dataloading.BlockSampler): def __init__(self, p, n_layers): super().__init__() self.n_layers = n_layers self.p = p def sample_frontier(self, block_id, g, seed_nodes, *args, **kwargs): # Get all inbound edges to `seed_nodes` sg = dgl.in_subgraph(g, seed_nodes) new_edges_masks = {} # Iterate over all edge types for etype in sg.canonical_etypes: edge_mask = torch.zeros(sg.number_of_edges(etype)) edge_mask.bernoulli_(self.p) new_edges_masks[etype] = edge_mask.bool() # Return a new graph with the same nodes as the original graph as a # frontier frontier = dgl.edge_subgraph(new_edge_masks, preserve_nodes=True) return frontier def __len__(self): return self.n_layers
卧槽上一节看吐了, 长的离谱; 后两节的长度终于正常的…
Chapter 3中自定义GNN模块与本节自定义GNN模块基本是相似的, 区别在于本节是需要再blocks上进行运算, 本质和图的运算时差不多的;
class CustomGraphConv(nn.Module):
def __init__(self, in_feats, out_feats):
super().__init__()
self.W = nn.Linear(in_feats * 2, out_feats)
def forward(self, g, h):
with g.local_scope():
g.ndata['h'] = h
g.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'h_neigh'))
return self.W(torch.cat([g.ndata['h'], g.ndata['h_neigh']], 1))
class CustomGraphConv(nn.Module): def __init__(self, in_feats, out_feats): super().__init__() self.W = nn.Linear(in_feats * 2, out_feats) # h is now a pair of feature tensors for input and output nodes, instead of # a single feature tensor. # def forward(self, g, h): def forward(self, block, h): # with g.local_scope(): with block.local_scope(): # g.ndata['h'] = h h_src = h h_dst = h[:block.number_of_dst_nodes()] block.srcdata['h'] = h_src block.dstdata['h'] = h_dst # g.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'h_neigh')) block.update_all(fn.copy_u('h', 'm'), fn.mean('m', 'h_neigh')) # return self.W(torch.cat([g.ndata['h'], g.ndata['h_neigh']], 1)) return self.W(torch.cat( [block.dstdata['h'], block.dstdata['h_neigh']], 1))
block.number_of_dst_nodes
获得;g.ndata
替换为block.srcdata
(输入节点)或block.dstdata
(输出节点), 前提这是同构图, 即只有一种类型的节点;g.nodes
替换为block.srcnodes
(输入节点)或block.dstnodes
(输出节点), 前提这是同构图, 即只有一种类型的节点;g.number_of_nodes
替换为block.number_of_src_nodes
(输入节点)或block.number_of_dst_nodes
(输出节点);class CustomHeteroGraphConv(nn.Module): def __init__(self, g, in_feats, out_feats): super().__init__() self.Ws = nn.ModuleDict() for etype in g.canonical_etypes: utype, _, vtype = etype self.Ws[etype] = nn.Linear(in_feats[utype], out_feats[vtype]) for ntype in g.ntypes: self.Vs[ntype] = nn.Linear(in_feats[ntype], out_feats[ntype]) def forward(self, g, h): with g.local_scope(): for ntype in g.ntypes: g.nodes[ntype].data['h_dst'] = self.Vs[ntype](h[ntype]) g.nodes[ntype].data['h_src'] = h[ntype] for etype in g.canonical_etypes: utype, _, vtype = etype g.update_all( fn.copy_u('h_src', 'm'), fn.mean('m', 'h_neigh'), etype=etype) g.nodes[vtype].data['h_dst'] = g.nodes[vtype].data['h_dst'] + \ self.Ws[etype](g.nodes[vtype].data['h_neigh']) return {ntype: g.nodes[ntype].data['h_dst'] for ntype in g.ntypes}
class CustomHeteroGraphConv(nn.Module): def __init__(self, g, in_feats, out_feats): super().__init__() self.Ws = nn.ModuleDict() for etype in g.canonical_etypes: utype, _, vtype = etype self.Ws[etype] = nn.Linear(in_feats[utype], out_feats[vtype]) for ntype in g.ntypes: self.Vs[ntype] = nn.Linear(in_feats[ntype], out_feats[ntype]) def forward(self, g, h): with g.local_scope(): for ntype in g.ntypes: h_src, h_dst = h[ntype] g.dstnodes[ntype].data['h_dst'] = self.Vs[ntype](h[ntype]) g.srcnodes[ntype].data['h_src'] = h[ntype] for etype in g.canonical_etypes: utype, _, vtype = etype g.update_all( fn.copy_u('h_src', 'm'), fn.mean('m', 'h_neigh'), etype=etype) g.dstnodes[vtype].data['h_dst'] = \ g.dstnodes[vtype].data['h_dst'] + \ self.Ws[etype](g.dstnodes[vtype].data['h_neigh']) return {ntype: g.dstnodes[ntype].data['h_dst'] for ntype in g.ntypes}
dgl
库中内置的所有消息传递模块都可以在同构图, 无向二分图(包含两种节点类型和一种边类型), 只有一种边类型的blocks上; 因此内置的dgl
神经网络模块的输入必须是上述三种图之一:
dgl
库会自动将输出节点上的特征作为输入节点特征的前几行;dgl.nn.pytorch.SAGEConv
为例:import dgl.function as fn class SAGEConv(nn.Module): def __init__(self, in_feats, out_feats): super().__init__() self.W = nn.Linear(in_feats * 2, out_feats) def forward(self, g, h): if isinstance(h, tuple): h_src, h_dst = h elif g.is_block: h_src = h h_dst = h[:g.number_of_dst_nodes()] else: h_src = h_dst = h g.srcdata['h'] = h_src g.dstdata['h'] = h_dst g.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'h_neigh')) return F.relu( self.W(torch.cat([g.dstdata['h'], g.dstdata['h_neigh']], 1)))
dgl.nn.pytorch.SAGEConv
做了完整的一遍梳理, 它可以在上面提到的三种图上工作;MultiLayerFullNeighborSampler
, 但是只对每层采样一次;class StochasticTwoLayerGCN(nn.Module): def __init__(self, in_features, hidden_features, out_features): super().__init__() self.hidden_features = hidden_features self.out_features = out_features self.conv1 = dgl.nn.GraphConv(in_features, hidden_features) self.conv2 = dgl.nn.GraphConv(hidden_features, out_features) self.n_layers = 2 def forward(self, blocks, x): x_dst = x[:blocks[0].number_of_dst_nodes()] x = F.relu(self.conv1(blocks[0], (x, x_dst))) x_dst = x[:blocks[1].number_of_dst_nodes()] x = F.relu(self.conv2(blocks[1], (x, x_dst))) return x def inference(self, g, x, batch_size, device): """ Offline inference with this module """ # Compute representations layer by layer for l, layer in enumerate([self.conv1, self.conv2]): y = torch.zeros(g.number_of_nodes(), self.hidden_features if l != self.n_layers - 1 else self.out_features) sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1) dataloader = dgl.dataloading.NodeDataLoader( g, torch.arange(g.number_of_nodes()), sampler, batch_size=batch_size, shuffle=True, drop_last=False) # Within a layer, iterate over nodes in batches for input_nodes, output_nodes, blocks in dataloader: block = blocks[0] # Copy the features of necessary input nodes to GPU h = x[input_nodes].to(device) # Compute output. Note that this computation is the same # but only for a single layer. h_dst = h[:block.number_of_dst_nodes()] h = F.relu(layer(block, (h, h_dst))) # Copy to output back to CPU. y[output_nodes] = h.cpu() x = y return y
import dgl import torch as th dgl.distributed.initialize('ip_config.txt', num_servers, num_workers) th.distributed.init_process_group(backend='gloo') g = dgl.distributed.DistGraph('graph_name', 'part_config.json') pb = g.get_partition_book() train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True) # Create sampler sampler = NeighborSampler(g, [10,25], dgl.distributed.sample_neighbors, device) dataloader = DistDataLoader( dataset=train_nid.numpy(), batch_size=batch_size, collate_fn=sampler.sample_blocks, shuffle=True, drop_last=False) # Define model and optimizer model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout) model = th.nn.parallel.DistributedDataParallel(model) loss_fcn = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=args.lr) # training loop for epoch in range(args.num_epochs): for step, blocks in enumerate(dataloader): batch_inputs, batch_labels = load_subtensor(g, blocks[0].srcdata[dgl.NID], blocks[-1].dstdata[dgl.NID]) batch_pred = model(blocks, batch_inputs) loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() loss.backward() optimizer.step()
DistGraph
来访问分区图形数据, 并具有DistEmbedding
和DistTensor
来访问节点/边缘特征/嵌入;DistDataLoader
与采样器进行交互以获得minibatch;本章内容可能对于大部分人来说都不太会用得到, 且主要是文字说明, 笔者主要做一些机翻, 如果有兴趣可以直接通过链接查看原文;
DGL官方文档 ;
DGL需要预处理图形数据以进行分布式训练, 包括两个步骤:
DGL提供了执行两个步骤的分区API; 该API支持随机分区和基于Metis的分区; Metis分区的好处在于, 它可以以最小的边沿切割生成分区, 从而减少了用于分布式训练和推理的网络通信;
DGL使用最新版本的Metis, 并具有针对具有幂律分布的真实图形进行优化的选项; 分区后, API以易于在训练期间加载的格式构造分区结果;
注意: 图形分区API当前在一台计算机上运行; 因此, 如果图形很大, 则用户将需要一台大型计算机来对图形进行分区; 将来, DGL将支持分布式图形分区;
默认情况下, 分区API将新ID分配给输入图中的节点和边, 以在分布式训练/推理期间帮助定位节点/边; 分配ID后, 分区API会相应地对所有节点数据和边缘数据进行混洗; 在培训期间, 用户只需使用新的节点/边缘ID; 但是, 仍然可以通过g.ndata['orig_id']
和g.edata['orig_id']
访问原始ID, 其中g是DistGraph对象(请参见DistGraph部分);
分区结果存储在输出目录中的多个文件中; 它始终包含一个名为xxx.json
的JSON文件, 其中xxx
是提供给分区API的图形名称; JSON文件包含所有分区配置; 如果分区API没有为节点和边缘分配新的ID, 它将生成两个附加的Numpy文件: node_map.npy
和edge_map.npy
, 它们存储节点/边缘ID与分区ID之间的映射; 对于具有数十亿个节点和边的图, 两个文件中的Numpy数组很大, 因为它们在图中的每个节点和边都有一个条目; 在每个分区的文件夹内, 有三个文件以DGL格式存储分区数据; graph.dgl
存储分区的图结构以及节点和边缘上的一些元数据; node_feats.dgl
和edge_feats.dgl
存储属于该分区的节点和边的所有特征;
data_root_dir/
|-- xxx.json # partition configuration file in JSON
|-- node_map.npy # partition id of each node stored in a numpy array (optional)
|-- edge_map.npy # partition id of each edge stored in a numpy array (optional)
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format
|-- edge_feats.dgl # edge features stored in binary format
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/ # data for partition 1
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
dgl.distributed.partition_graph()
中指定balance_ntypes
, 可以在每个节点类型中的节点数之间实现分区之间的平衡; 用户可以利用这一点, 并考虑训练集中, 验证集中和测试集中的节点属于不同的节点类型;dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
dgl.distributed.partition_graph()
还允许通过指定balance_edges
在不同节点类型的节点的入度之间进行平衡; 这平衡了入射到不同类型节点的边的数量;
dgl.distributed.partition_graph()
的图形名称是一个重要的参数; dgl.distributed.DistGraph
将使用图名称来标识分布式图; 合法图形名称应仅包含字母字符和下划线;章节内容详见: DGL官方文档 ;
主要的接口函数索引在: dgl.distributed
;
代码示例:
初始化DGL分布式模块:
dgl.distributed.initialize('ip_config.txt', num_workers=4)
th.distributed.init_process_group(backend='gloo')
分布式图: dgl.distributed.DistGraph(graph_name, gpb=None, part_config=None)
;
分布式模式 v.s. 独立(standalone)模式;
分布式图创建:
import dgl
g = dgl.distributed.DistGraph('graph_name')
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
获取图结构:
print(g.number_of_nodes())
获取节点和边的数据:
g.ndata['train_mask']
<dgl.distributed.dist_graph.DistTensor at 0x7fec820937b8>
g.ndata['train_mask'][0]
tensor([1], dtype=torch.uint8)
分布式张量:
tensor = dgl.distributed.DistTensor((g.number_of_nodes(), 10), th.float32, name='test')
g.ndata['feat'] = tensor
data = g.ndata['feat'][[1, 2, 3]]
print(data)
g.ndata['feat'][[3, 4, 5]] = data
分布式嵌入:
def initializer(shape, dtype):
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
sparse_optimizer = dgl.distributed.SparseAdagrad([emb], lr=lr1)
optimizer = th.optim.Adam(model.parameters(), lr=lr2)
feats = emb(nids)
loss = model(feats)
loss.backward()
optimizer.step()
sparse_optimizer.step()
分布式采样:
def sample_blocks(seeds):
seeds = th.LongTensor(np.asarray(seeds))
blocks = []
for fanout in [10, 25]:
frontier = dgl.distributed.sample_neighbors(g, seeds, fanout, replace=True)
block = dgl.to_block(frontier, seeds)
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
return blocks
dataloader = dgl.distributed.DistDataLoader(dataset=train_nid,
batch_size=batch_size,
collate_fn=sample_blocks,
shuffle=True)
for batch in dataloader:
...
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
for batch in dataloader:
...
负载分割:
train_nids = dgl.distributed.node_split(g.ndata['train_mask'])
DGL官方文档 ;
tools/copy_files.py
用于将图分区复制到图;tools/launch.py
用于在机器集群中启动分布式训练工作;copy_files.py
将机器(在其中对图形进行分区的机器)中的分区数据和相关文件(例如, 训练脚本)复制到机器集群(在其中进行分布式训练); 该脚本将分区复制到机器上, 在该计算机上, 分布式训练作业将需要该分区; 该脚本包含四个参数:
--part_config
指定分区配置文件, 该文件包含本地计算机中分区数据的信息;--ip_config
指定集群的IP配置文件;--workspace
指定训练机中存储与分布式训练有关的所有数据的目录;--rel_data_path
指定工作空间目录下将存储分区数据的相对路径;--script_folder
指定工作空间目录下存储用户的训练脚本的相对路径;copy_files.py
根据IP配置文件找到合适的机器来存储分区; 因此, copy_files.py
和launch.py
应该使用相同的IP配置文件;tools/launch.py
; 该脚本进行以下假设:python3 tools/launch.py \
--workspace ~graphsage/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1 --num_workers 4"
ip_config.txt
包含集群中计算机的IP地址; ip_config.txt
的典型示例如下:172.31.19.1
172.31.23.205
172.31.29.175
172.31.16.98
--num_trainers
);--num_samplers
); 采样器进程的数量必须与initialize()
中指定的辅助进程的数量匹配;在附录的链接页面上, 官方文档给出了大量的接口函数, 出于时间成本考虑笔者不再一一翻译记录, 笔者简单浏览了一遍后发现还是有不少有趣的方法的, 比如dgl.sampling
中提到了随机游走的采样方法, 因此本章将不定期更新笔者在实际使用中遇到的值得记录的接口函数用法, 详细的接口函数只能看源码以及附录中的各个模块的链接了;
DGLGraph.add_edges(u, v, data=None, etype=None)
:u(int, tensor, numpy.ndarray, list)
: 源节点编号, u[i]
为第i
条边的源节点;v(int, tensor, numpy.ndarray, list)
: 目标节点编号, v[i]
为第i
条边的目标节点;data(dict[str, tensor])
: 键为特征名称(常见的如'h'
或'w'
), 值为特征值, 值的第i
行对应第i
条边的特征值.etype(str or tuple of str)
: 每条边的类型, 如果为同构图(只有一种边类型)则可以省略;g.add_edges(torch.tensor([0, 0]), torch.tensor([2, 2]), {'h': torch.tensor([[1.], [2.]]), 'w': torch.ones(2, 1)})
dgl.DGLGraph
或dgl.graph
创建的图都是dgl.heterograph.DGLHeteroGraph
类型的;src
与dst
参数中最大的节点ID决定了总节点数, 而非是根据src
和dst
num_edges
中;src = [1, 2, 3, 66, 1]
和dst = [2, 3, 66, 1, 2]
创建图则会由67个节点与5条边;以官方文档中
dgl.function
一节为参考;
send(edges, message_func)
: 根据给定的边计算消息;recv(nodes, reduce_func)
: 收集输入的消息, 进行消息聚合等其他操作;import dgl import dgl.function as fn import torch as th import numpy as np # 1 create a DGLGraph src = np.random.randint(0, 100, 500) dst = np.random.randint(0, 100, 500) g = dgl.graph((np.concatenate([src, dst]), np.concatenate([dst, src]))) # 2 set feature for nodes and edges g.ndata['h'] = th.randn((g.number_of_nodes(), 10)) # each node has feature size 10 g.edata['w'] = th.randn((g.number_of_edges(), 1)) # each edge has feature size 1 # 3 collect features from source nodes and aggregate them in destination nodes g.update_all(fn.copy_u('h', 'm'), fn.sum('m', 'h_sum')) print(g.ndata['m']) # error print(g.edata['m']) # error # 4 multiply source node features with edge weights and aggregate them in destination nodes g.update_all(fn.u_mul_e('h', 'w', 'm'), fn.max('m', 'h_max')) # 5 compute edge embedding by multiplying source and destination node embeddings g.apply_edges(fn.u_mul_v('h', 'h', 'w_new'))
randn
未必会取得到100), 500条边的随图, 将图设置为无向图(取逆向图再合并);update_all
函数可以参考本文 Chapter 2中的具体说明, 它接收一个消息生成函数, 消息聚合函数和更新函数(optional), 这里先复制了一份节点h
特征然后再将所有节点的h_sum
特征值更新为其近邻节点的m
特征值之和;
'm'
显然是一个中间值, 可以发现update_all
函数执行结束后根本没有名为m
的特征值, 这就是上面所说的融合后而无需存储显示消息;h
特征值相乘达到更新边特征的目的;’fn.u_mul_e('h', 'w', 'm')
等价于:def udf_u_mul_e(edges):
return {'m' : edges.src['h'] * edges.data['w']}
fn.max('m', 'h_max')
等价于:def udf_max(nodes):
return {'h_max' : th.max(nodes.mailbox['m'], 1)[0]}
copy_u
, copy_src
: 两个函数功能用法完全相同, 参数为(src, out)
;copy_e
, copy_edge
: 两个函数功能用法完全相同, 参数为(edge, out)
;(lhs_field, rhs_field, out)
;
u_add_v
, u_sub_v
, u_mul_v
, u_div_v
, u_dot_v
;u_add_e
, u_sub_e
, u_mul_e
, u_div_e
, u_dot_e
;v_add_u
, v_sub_u
, v_mul_u
, v_div_u
, v_dot_u
;v_add_e
, v_sub_e
, v_mul_e
, v_div_e
, v_dot_e
;e_add_u
, e_sub_u
, e_mul_u
, e_div_u
, e_dot_u
;e_add_v
, e_sub_v
, e_mul_v
, e_div_v
, e_dot_v
;src_mul_edge
是u_mul_e
的另一种写法, 参数为(src, edge, out)
;(msg, out)
;
max
, min
;sum
, mean
;graph.apply_edge
函数:graph.apply_edges(dgl.function.u_add_v('e', 'e', 'e'))
是将每条边的名为'e'
的特征值(不一定要存在, 即边可以没有名为'e'
的特征值)更新为该边源节点的'e'
特征值加上目标节点的'e'
特征值;graph.apply_edges(dgl.function.u_add_v('el', 'er', 'e'))
是将每条边的名为'e'
的特征值(不一定要存在, 即边可以没有名为'e'
的特征值)更新为该边源节点的'el'
特征值加上目标节点的'er'
特征值;dgl
: https://docs.dgl.ai/api/python/dgl.html ;dgl.data
: https://docs.dgl.ai/api/python/dgl.data.html ;dgl.dataloading
: https://docs.dgl.ai/api/python/dgl.dataloading.html ;dgl.DGLGraph
: https://docs.dgl.ai/api/python/dgl.DGLGraph.html ;dgl.distributed
: https://docs.dgl.ai/api/python/dgl.distributed.html ;dgl.function
: https://docs.dgl.ai/api/python/dgl.function.html ;dgl.nn
: https://docs.dgl.ai/api/python/nn.html ;dgl.ops
: https://docs.dgl.ai/api/python/dgl.ops.html ;dgl.sampling
: https://docs.dgl.ai/api/python/dgl.sampling.html ;Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。