0%

GCN图卷积神经网络实战

图卷积神经网络实战

基本原理详情见

该代码为《深入浅出的图神经网络》配套代码,主要从数据处理,卷积操作设计,图卷积模型设计,模型训练逐行分析研究代码的运行和设计,适合新手入门。

首先介绍必须导入的库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#导入必要的库
import itertools # 提供了一系列用于高效循环的迭代器。这些迭代器适用于各种迭代任务,包括组合、分组和重复数据流的操作。
import os #os模块提供了与操作系统交互的功能,允许你执行文件系统操作,比如获取和改变当前工作目录、检查和修改文件权限、获取环境变量等。
import os.path as osp #是os模块的一部分,专门用于路径名操作。
import pickle #实现了一个算法,用于将Python对象序列化和反序列化。序列化过程中,Python对象被转换为一串字节,这允许你将对象保存到一个文件中或通过网络传输。反序列化则是这个过程的逆过程。
import urllib #是一个处理URL(统一资源定位符)和执行HTTP请求的模块。
from collections import namedtuple #namedtuple是collections模块中的一个工厂函数,用于创建子类的元组对象。

import numpy as np #python处理矩阵常用的库
import scipy.sparse as sp #邻接矩阵用稀疏矩阵形式存储 节省空间
import torch #pytorch的核心库,提供了张量的定义和各种操作的函数,比如将numpy放在GPU上(放在GPU上的矩阵就叫张量)。还包括了自动求导系统。
import torch.nn as nn #包括了神经网络的各种所需工具和层的定义,比如卷积层(Conv2d)、线性层(Linear)、激活函数(例如ReLU)、损失函数(如CrossEntropyloss)。
import torch.nn.functional as F #提供了一些nn模块中层的函数接口,但这些函数通常为无状态的,即不含可学习的参数。
import torch.nn.init as init #包含多种初始化方法来初始化神经网络的参数\权重
import torch.optim as optim #提供优化算法来更新神经网络中的参数,如SGD、Adam等
import matplotlib.pyplot as plt #matplotlib.pyplot是一个Python的绘图库,经常用于可视化数据和训练过程中的指标,如损失和精度。
%matplotlib inline#这是一个Jupyter notebook的魔法命令,用于在笔记本内部直接显示matplotlib生成的图像。使用这个命令后,每次调用绘图函数时,图像会直接嵌入到笔记本中,而不是在新窗口中打开。

数据集与预处理

该代码主要是读取数据集或者是当数据集不在的时候通过url来进行下载并处理,有一些奇特的函数设计可以学习其思想。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
Data = namedtuple('Data', ['x', 'y', 'adjacency',
'train_mask', 'val_mask', 'test_mask'])
#

def tensor_from_numpy(x, device): #将数据从数组格式转换为tensor格式 并转移到相关设备上
return torch.from_numpy(x).to(device)


class CoraData(object):
#数据集下载链接
download_url = "https://raw.githubusercontent.com/kimiyoung/planetoid/master/data"
#数据集中包含的文件名
filenames = ["ind.cora.{}".format(name) for name in
['x', 'tx', 'allx', 'y', 'ty', 'ally', 'graph', 'test.index']]

def __init__(self, data_root="cora", rebuild=False):
"""Cora数据,包括数据下载,处理,加载等功能
当数据的缓存文件存在时,将使用缓存文件,否则将下载、进行处理,并缓存到磁盘
处理之后的数据可以通过属性 .data 获得,它将返回一个数据对象,包括如下几部分:
* x: 节点的特征,维度为 2708 * 1433,类型为 np.ndarray
* y: 节点的标签,总共包括7个类别,类型为 np.ndarray
* adjacency: 邻接矩阵,维度为 2708 * 2708,类型为 scipy.sparse.coo.coo_matrix
* train_mask: 训练集掩码向量,维度为 2708,当节点属于训练集时,相应位置为True,否则False
* val_mask: 验证集掩码向量,维度为 2708,当节点属于验证集时,相应位置为True,否则False
* test_mask: 测试集掩码向量,维度为 2708,当节点属于测试集时,相应位置为True,否则False
Args:
-------
data_root: string, optional
存放数据的目录,原始数据路径: {data_root}/raw
缓存数据路径: {data_root}/processed_cora.pkl
rebuild: boolean, optional
是否需要重新构建数据集,当设为True时,如果存在缓存数据也会重建数据
"""
self.data_root = data_root
save_file = osp.join(self.data_root, "processed_cora.pkl")
if osp.exists(save_file) and not rebuild: #使用缓存数据
print("Using Cached file: {}".format(save_file))
self._data = pickle.load(open(save_file, "rb"))
else:
self.maybe_download() #下载或使用原始数据集
self._data = self.process_data() #数据预处理
with open(save_file, "wb") as f: #把处理好的数据保存为缓存文件.pkl 下次直接使用
pickle.dump(self.data, f)
print("Cached file: {}".format(save_file))

@property
def data(self):
"""返回Data数据对象,包括x, y, adjacency, train_mask, val_mask, test_mask"""
return self._data

def process_data(self):
"""
处理数据,得到节点特征和标签,邻接矩阵,训练集、验证集以及测试集
引用自:https://github.com/rusty1s/pytorch_geometric
"""
print("Process data ...")
#读取下载的数据文件
_, tx, allx, y, ty, ally, graph, test_index = [self.read_data(
osp.join(self.data_root, "raw", name)) for name in self.filenames]

train_index = np.arange(y.shape[0]) #训练集索引
val_index = np.arange(y.shape[0], y.shape[0] + 500)#验证集索引
sorted_test_index = sorted(test_index) #测试集索引

x = np.concatenate((allx, tx), axis=0) #节点特征 N*D 2708*1433
y = np.concatenate((ally, ty), axis=0).argmax(axis=1)#节点对应的标签 2708

x[test_index] = x[sorted_test_index]
y[test_index] = y[sorted_test_index]
num_nodes = x.shape[0] #节点数/数据量 2708

#训练、验证、测试集掩码
#初始化为0
train_mask = np.zeros(num_nodes, dtype=np.bool)
val_mask = np.zeros(num_nodes, dtype=np.bool)
test_mask = np.zeros(num_nodes, dtype=np.bool)

train_mask[train_index] = True
val_mask[val_index] = True
test_mask[test_index] = True

#构建邻接矩阵
adjacency = self.build_adjacency(graph)
print("Node's feature shape: ", x.shape) #(N*D)
print("Node's label shape: ", y.shape)#(N,)
print("Adjacency's shape: ", adjacency.shape) #(N,N)
#训练、验证、测试集各自的大小
print("Number of training nodes: ", train_mask.sum())
print("Number of validation nodes: ", val_mask.sum())
print("Number of test nodes: ", test_mask.sum())

return Data(x=x, y=y, adjacency=adjacency,
train_mask=train_mask, val_mask=val_mask, test_mask=test_mask)

def maybe_download(self):
#原始数据保存路径
save_path = os.path.join(self.data_root, "raw")
#下载相应的文件
for name in self.filenames:
if not osp.exists(osp.join(save_path, name)):
self.download_data(
"{}/{}".format(self.download_url, name), save_path)

@staticmethod
def build_adjacency(adj_dict):
"""根据下载的邻接表创建邻接矩阵"""
edge_index = []
num_nodes = len(adj_dict)
for src, dst in adj_dict.items():
edge_index.extend([src, v] for v in dst)
edge_index.extend([v, src] for v in dst)
# 去除重复的边
edge_index = list(k for k, _ in itertools.groupby(sorted(edge_index)))
edge_index = np.asarray(edge_index)
#稀疏矩阵 存储非0值 节省空间
adjacency = sp.coo_matrix((np.ones(len(edge_index)),
(edge_index[:, 0], edge_index[:, 1])),
shape=(num_nodes, num_nodes), dtype="float32")
return adjacency

@staticmethod
def read_data(path):
"""使用不同的方式读取原始数据以进一步处理"""
name = osp.basename(path)
if name == "ind.cora.test.index":
out = np.genfromtxt(path, dtype="int64")
return out
else:
out = pickle.load(open(path, "rb"), encoding="latin1")
out = out.toarray() if hasattr(out, "toarray") else out #这一行的意思是如果out是密集矩阵则不变 否则的话转化为密集矩阵
return out

@staticmethod
def download_data(url, save_path):
"""数据下载工具,当原始数据不存在时将会进行下载"""
if not os.path.exists(save_path):
os.makedirs(save_path)
data = urllib.request.urlopen(url)
filename = os.path.split(url)[-1]

with open(os.path.join(save_path, filename), 'wb') as f:
f.write(data.read())

return True

@staticmethod
def normalization(adjacency):
"""计算 L=D^-0.5 * (A+I) * D^-0.5"""
adjacency += sp.eye(adjacency.shape[0]) # 增加自连接 不仅考虑邻接节点特征 还考虑节点自身的特征
degree = np.array(adjacency.sum(1)) #此时的度矩阵的对角线的值 为 邻接矩阵 按行求和
d_hat = sp.diags(np.power(degree, -0.5).flatten()) #对度矩阵对角线的值取-0.5次方 再转换为对角矩阵
return d_hat.dot(adjacency).dot(d_hat).tocoo() #归一化的拉普拉斯矩阵 稀疏存储 节省空间

图卷积层定义

主要是因为pytorch中只有传统的卷积,没有图卷积操作(图卷积和传统的卷积不太一样,没有感受野的概念)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class GraphConvolution(nn.Module):
def __init__(self, input_dim, output_dim, use_bias=True):
"""图卷积:L*X*\theta

Args:
----------
input_dim: int
节点输入特征的维度
output_dim: int
输出特征维度
use_bias : bool, optional
是否使用偏置
"""
super(GraphConvolution, self).__init__()
#下面三行设置了卷积操作的输入和输出以及是否使用偏置
self.input_dim = input_dim
self.output_dim = output_dim
self.use_bias = use_bias
#nn.Parameter表示这个数组是可训练的,torch.Tensor(input_dim, output_dim)表示创建了一个input_dim x output_dim的张量
self.weight = nn.Parameter(torch.Tensor(input_dim, output_dim))
if self.use_bias:
self.bias = nn.Parameter(torch.Tensor(output_dim))
else:
self.register_parameter('bias', None)
self.reset_parameters()
#初始化参数
def reset_parameters(self):
init.kaiming_uniform_(self.weight)
if self.use_bias:
init.zeros_(self.bias)

def forward(self, adjacency, input_feature):
"""邻接矩阵是稀疏矩阵,因此在计算时使用稀疏矩阵乘法

Args:
-------
adjacency: torch.sparse.FloatTensor
邻接矩阵
input_feature: torch.Tensor
输入特征
"""
#将输入特征和矩阵相乘
support = torch.mm(input_feature, self.weight)
#输出和邻接矩阵相乘(因为图卷积的感受野是跟邻接矩阵相关的)
output = torch.sparse.mm(adjacency, support)
if self.use_bias:
output += self.bias
return output
#当你调用内置的 repr() 函数时,或者当你尝试在交互式解释器中打印对象时,就会调用这个方法。它的目的是返回一个对象的准确表示,通常可以用这个字符串表示来重新创建该对象。
def __repr__(self):
return self.__class__.__name__ + ' (' \
+ str(self.input_dim) + ' -> ' \
+ str(self.output_dim) + ')'

模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
class GcnNet(nn.Module):
"""
定义一个包含两层GraphConvolution的模型
"""
def __init__(self, input_dim=1433):
super(GcnNet, self).__init__()
self.gcn1 = GraphConvolution(input_dim, 16)
self.gcn2 = GraphConvolution(16, 7)

def forward(self, adjacency, feature):
h = F.relu(self.gcn1(adjacency, feature))
logits = self.gcn2(adjacency, h)
return logits

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 超参数定义
LEARNING_RATE = 0.1
WEIGHT_DACAY = 5e-4
EPOCHS = 200
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# 加载数据,并转换为torch.Tensor
dataset = CoraData().data
node_feature = dataset.x / dataset.x.sum(1, keepdims=True) # 归一化数据,使得每一行和为1
tensor_x = tensor_from_numpy(node_feature, DEVICE)
tensor_y = tensor_from_numpy(dataset.y, DEVICE)
tensor_train_mask = tensor_from_numpy(dataset.train_mask, DEVICE)
tensor_val_mask = tensor_from_numpy(dataset.val_mask, DEVICE)
tensor_test_mask = tensor_from_numpy(dataset.test_mask, DEVICE)
normalize_adjacency = CoraData.normalization(dataset.adjacency) # 规范化邻接矩阵

num_nodes, input_dim = node_feature.shape
indices = torch.from_numpy(np.asarray([normalize_adjacency.row,normalize_adjacency.col]).astype('int64')).long()
values = torch.from_numpy(normalize_adjacency.data.astype(np.float32))
tensor_adjacency = torch.sparse.FloatTensor(indices, values,(num_nodes, num_nodes)).to(DEVICE)
# 模型定义:Model, Loss, Optimizer
model = GcnNet(input_dim).to(DEVICE)
criterion = nn.CrossEntropyLoss().to(DEVICE)
optimizer = optim.Adam(model.parameters(),lr=LEARNING_RATE,weight_decay=WEIGHT_DACAY)
# 训练主体函数
def train():
loss_history = []
val_acc_history = []
model.train()
train_y = tensor_y[tensor_train_mask]
for epoch in range(EPOCHS):
logits = model(tensor_adjacency, tensor_x) # 前向传播
train_mask_logits = logits[tensor_train_mask] # 只选择训练节点进行监督
loss = criterion(train_mask_logits, train_y) # 计算损失值
optimizer.zero_grad()
loss.backward() # 反向传播计算参数的梯度
optimizer.step() # 使用优化方法进行梯度更新
train_acc, _, _ = test(tensor_train_mask) # 计算当前模型训练集上的准确率
val_acc, _, _ = test(tensor_val_mask) # 计算当前模型在验证集上的准确率
# 记录训练过程中损失值和准确率的变化,用于画图
loss_history.append(loss.item())
val_acc_history.append(val_acc.item())
print("Epoch {:03d}: Loss {:.4f}, TrainAcc {:.4}, ValAcc {:.4f}".format(
epoch, loss.item(), train_acc.item(), val_acc.item()))

return loss_history, val_acc_history
# 测试函数
def test(mask):
model.eval()
with torch.no_grad():
logits = model(tensor_adjacency, tensor_x)
test_mask_logits = logits[mask]
predict_y = test_mask_logits.max(1)[1]
accuarcy = torch.eq(predict_y, tensor_y[mask]).float().mean()
return accuarcy, test_mask_logits.cpu().numpy(), tensor_y[mask].cpu().numpy()
def plot_loss_with_acc(loss_history, val_acc_history):
fig = plt.figure()
ax1 = fig.add_subplot(111)
ax1.plot(range(len(loss_history)), loss_history,
c=np.array([255, 71, 90]) / 255.)
plt.ylabel('Loss')

ax2 = fig.add_subplot(111, sharex=ax1, frameon=False)
ax2.plot(range(len(val_acc_history)), val_acc_history,
c=np.array([79, 179, 255]) / 255.)
ax2.yaxis.tick_right()
ax2.yaxis.set_label_position("right")
plt.ylabel('ValAcc')

plt.xlabel('Epoch')
plt.title('Training Loss & Validation Accuracy')
plt.show()

训练+输出

1
2
3
loss, val_acc = train()
test_acc, test_logits, test_label = test(tensor_test_mask)
print("Test accuarcy: ", test_acc.item())

绘图

1
plot_loss_with_acc(loss, val_acc)
1
2
3
4
5
6
7
8
9
10
# 绘制测试数据的TSNE降维图
from sklearn.manifold import TSNE
tsne = TSNE()
out = tsne.fit_transform(test_logits)
fig = plt.figure()
for i in range(7):
indices = test_label == i
x, y = out[indices].T
plt.scatter(x, y, label=str(i))
plt.legend()