3.1 神经网络的核心组件和主要工具

"""
<===== 3.1 神经网络的主要组件 =====>
神经网络的核心组件包括:
1. 层(Layers):神经网络由多个层组成,每一层包含若干神经元(Neurons),层是神经网络的基本结构,将输入数据转换为输出数据。
2. 模型(Models):模型是由多个层组成的整体结构,定义了数据如何在层之间流动以及如何进行计算。
3. 损失函数(Loss Functions):参数学习的目标函数,通过最小化损失函数来优化模型参数,使模型的预测结果更接近真实值。
4. 优化器(Optimizers):用于调整模型参数以最小化损失函数的算法。
多个层链接在一起构成一个模型或者网络,输入数据通过这个模型转换为预测值。预测值与真实值共同构成损失函数的输入,损失函数的损失值用于衡量预测值与目标结果的 \
匹配或相似程度。优化器可以利用损失值来更新模型参数,目的是让损失值更小,从而提升模型的预测能力。
这是一个循环迭代的过程,当损失值足够小或者达到预设的迭代次数时,训练过程结束。
可以利用 PyTorch 中的 nn 工具箱来构建神经网络的主要组件。

<----- 构建神经网络的主要工具 ----->
<----- nn.Module ----->
nn.Module 是 PyTorch 中所有神经网络模块的基类,几乎所有的神经网络组件(如层、模型等)都继承自 nn.Module。通过继承 nn.Module,可以方便的定义和管理神经网络的结构和参数。
nn 中已经实现了绝大多数层,比如 nn.Linear(全连接层)、nn.Conv2d(二维卷积层)、nn.LSTM(长短期记忆网络)、损失层、激活层等。
这些层都是 nn.Module 的子类,可以自动检测自己的参数,并将其作为学习参数。
nn 中继承了 nn.Module 的层名称一般为 nn.Xxxx(第一个字母大写)
<----- nn.functional ----->
另一类常用的神经网络组件是 nn.functional 中的函数,这些函数通常用于实现一些不包含学习参数的操作,比如激活函数(如 ReLU、Sigmoid)、池化操作(如 MaxPool、AvgPool)等。
从功能方面基于 nn.Module 实现的层 nn.functional 也可以实现,反之亦然。在具体使用过程中两者的主要区别如下:
1) nn.Xxx 继承于 nn.Module,需要先实例化并传入参数,然后以函数调用的方式调用实例化的对象并传入输入数据。可以很好地与 nn.Sequential 结合使用,而 nn.functional 不能与 nn.Sequential 结合使用。
2) nn.Xxx 不需要自己定义和管理 weight、bias 等学习参数,而 nn.functional 需要自己定义这些参数,每次调用时都需要手动传入这些参数,不方便代码的复用。
3) dropout 操作在训练和测试阶段是有区别的,使用 nn.Xxx 方式定义 dropout,在调用 model.eval() 之后自动实现状态的转换,而 nn.functional 方式需要手动指定训练和测试状态。

总的来说,nn.Module 更适合定义包含学习参数的层(比如 conv2d、linear、batch_norm、dropout),而 nn.functional 更适合定义不包含学习参数的操作(比如 maxpool、loss func、activation func)。
"""

3.2 构建模型

import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict

# <===== 3.2 构建模型 =====>
"""
使用 PyTorch 构建模型的方法大致有 3 种:
1) 继承 nn.Module 基类构建模型
2) 使用 nn.Sequential 容器按层顺序构建模型
3) 继承 nn.Module 基类构建模型,再使用相关容器(如 nn.Sequential、nn.ModuleList、nn.ModuleDict 等)进行封装
"""


# <----- 3.3.1 继承 nn.Module 基类构建模型 ----->
# 利用这种方法构建模型,先定义一个类,使之继承 nn.Module 基类。把模型中需要用到的层放在构造函数 __init__() 中,在 forward 方法中实现模型前向传播。
class Model_Seq(nn.Module):
"""
通过继承基类 nn.Module 来构建模型
"""

def __init__(self, in_dim, n_hidden_1, n_hidden_2, out_dim):
super(Model_Seq, self).__init__()
self.flatten = nn.Flatten()
self.linear1 = nn.Linear(in_dim, n_hidden_1)
self.bn1 = nn.BatchNorm1d(n_hidden_1)
self.linear2 = nn.Linear(n_hidden_1, n_hidden_2)
self.bn2 = nn.BatchNorm1d(n_hidden_2)
self.out = nn.Linear(n_hidden_2, out_dim)

def forward(self, x):
x = self.flatten(x)
x = self.linear1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.linear2(x)
x = self.bn2(x)
x = F.relu(x)
x = self.out(x)
x = F.softmax(x, dim=1)
return x


# 对超参数赋值
in_dim, n_hidden_1, n_hidden_2, out_dim = 28 * 28, 300, 100, 10
model_seq = Model_Seq(in_dim, n_hidden_1, n_hidden_2, out_dim)
print(model_seq)

# <----- 3.3.2 使用 nn.Sequential 容器按层顺序构建模型 ----->
# 使用 nn.Sequential 构建模型,因为其内部实现了 forward 函数,因此可以不用重写。
# nn.Sequential 里面的模块是按照先后顺序进行排列的,所以必须确保前一个模块的输出大小和下一个模块的输入大小是一致的。
# 这种方法一般适合比较简单的模型。
# 1) 利用可变参数 nn.Sequential(*args)
Seq_arg = nn.Sequential(
nn.Flatten(),
nn.Linear(in_dim, n_hidden_1),
nn.BatchNorm1d(n_hidden_1),
nn.ReLU(),
nn.Linear(n_hidden_1, n_hidden_2),
nn.BatchNorm1d(n_hidden_2),
nn.ReLU(),
nn.Linear(n_hidden_2, out_dim),
nn.Softmax(dim=1)
)
print(Seq_arg) # 继续使用上面定义的超参数
# 这种方法不能给每一层命名,如果需要给每一层命名,可以使用下面: add_module 方法和 OrderedDict 方法

# 2) 使用 add_module 方法
in_dim, n_hidden_1, n_hidden_2, out_dim = 25 * 25, 300, 100, 10 # 重新定义超参数
Seq_module = nn.Sequential()
Seq_module.add_module('flatten', nn.Flatten())
Seq_module.add_module('linear1', nn.Linear(in_dim, n_hidden_1))
Seq_module.add_module('bn1', nn.BatchNorm1d(n_hidden_1))
Seq_module.add_module('relu1', nn.ReLU())
Seq_module.add_module('linear2', nn.Linear(n_hidden_1, n_hidden_2))
Seq_module.add_module('bn2', nn.BatchNorm1d(n_hidden_2))
Seq_module.add_module('relu2', nn.ReLU())
Seq_module.add_module('out', nn.Linear(n_hidden_2, out_dim))
Seq_module.add_module('softmax', nn.Softmax(dim=1))
print(Seq_module)

# 3) 使用 OrderedDict 方法
Seq_dict = nn.Sequential(
OrderedDict(
[
('flatten', nn.Flatten()),
('linear1', nn.Linear(in_dim, n_hidden_1)),
('bn1', nn.BatchNorm1d(n_hidden_1)),
('relu1', nn.ReLU()),
('linear2', nn.Linear(n_hidden_1, n_hidden_2)),
('bn2', nn.BatchNorm1d(n_hidden_2)),
('relu2', nn.ReLU()),
('out', nn.Linear(n_hidden_2, out_dim)),
('softmax', nn.Softmax(dim=1))
]
)
)
print(Seq_dict)


# <----- 3.3.3 继承 nn.Module 基类并使用模型容器来构建模型 ----->
# 当模型的结构比较复杂时,可以应用模型容器来对模型的部分结构进行封装,以增强模型的可读性。
# 1) 使用 nn.Sequential 模型容器
class Model_lay(nn.Module):
"""
使用 nn.Sequential 模型容器来构建模型,Sequential 函数的功能是将网络的层组合到一起。
"""

def __init__(self, in_dim, n_hidden1, n_hidden2, out_dim):
super(Model_lay, self).__init__()
self.flatten = nn.Flatten()
self.layer1 = nn.Sequential(nn.Linear(in_dim, n_hidden1),
nn.BatchNorm1d(n_hidden1),
nn.ReLU())
self.layer2 = nn.Sequential(nn.Linear(n_hidden1, n_hidden2),
nn.BatchNorm1d(n_hidden2),
nn.ReLU())
self.out = nn.Linear(n_hidden2, out_dim)

def forward(self, x):
x = self.flatten(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.out(x)
x = F.softmax(x, dim=1)
return x


print(Model_lay)


# 2) 使用 nn.ModuleList 模型容器
class Model_lst(nn.Module):
"""
使用 nn.ModuleList 模型容器来构建模型,ModuleList 可以存储任意数量的 nn.Module 子类。
"""
def __init__(self, in_dim, n_hidden1, n_hidden2, out_dim):
super(Model_lst, self).__init__()
self.layers = nn.ModuleList([
nn.Flatten(),
nn.Linear(in_dim, n_hidden1),
nn.BatchNorm1d(n_hidden1),
nn.ReLU(),
nn.Linear(n_hidden1, n_hidden2),
nn.BatchNorm1d(n_hidden2),
nn.ReLU(),
nn.Linear(n_hidden2, out_dim),
nn.Softmax(dim=1)
])

def forward(self, x):
for index, layer in enumerate(self.layers):
print(f'第 {index} 层: {layer}')
x = layer(x)
return x


data = torch.randn(32, 25, 25)
model_lst = Model_lst(25 * 25, 300, 100, 10)
out = model_lst(data)


# 3) 使用 mm.ModuleDict 模型容器
class Model_dict(nn.Module):
def __init__(self, in_dim, n_hidden1, n_hidden2, out_dim):
super(Model_dict, self).__init__()
self.layers_dict = nn.ModuleDict({
'flatten': nn.Flatten(),
'linear1': nn.Linear(in_dim, n_hidden1),
'bn1': nn.BatchNorm1d(n_hidden1),
'relu': nn.ReLU(), # ReLU 层可以复用
'linear2': nn.Linear(n_hidden1, n_hidden2),
'bn2': nn.BatchNorm1d(n_hidden2),
'out': nn.Linear(n_hidden2, out_dim),
'softmax': nn.Softmax(dim=1)
})

def forward(self, x):
layers = ['flatten', 'linear1', 'bn1', 'relu', 'linear2', 'bn2', 'relu', 'out', 'softmax']
for layer in layers:
x = self.layers_dict[layer](x)
return x


# <----- 3.3.4 自定义网络模块 ----->
# 实现 ResNet
# 定义残差块
class ResNetBasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResNetBasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)

def forward(self, x):
output = self.conv1(x)
output = self.bn1(output)
output = F.relu(output)
output = self.conv2(output)
output = self.bn2(output)
return F.relu(output + x) # 残差连接


class ResNetDownBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride, downsample=None):
super(ResNetDownBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride[0], padding=1)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride[1], padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
self.extra = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], padding=0),
nn.BatchNorm2d(out_channels)
)

def forward(self, x):
extra_x = self.extra(x) # 残差

output = self.conv1(x)
output = self.bn1(output)
output = F.relu(output)
output = self.conv2(output)
output = self.bn2(output)
return F.relu(output + extra_x)


# 组合这两个模块得到 ResNet18
class ResNet18(nn.Module):
def __init__(self):
super(ResNet18, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
self.bn1 = nn.BatchNorm2d(64)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = nn.Sequential(
ResNetBasicBlock(64, 64, 1),
ResNetBasicBlock(64, 64, 1)
)
self.layer2 = nn.Sequential(
ResNetDownBlock(64, 128, [2, 1]),
ResNetBasicBlock(128, 128, 1)
)
self.layer3 = nn.Sequential(
ResNetDownBlock(128, 256, [2, 1]),
ResNetBasicBlock(256, 256, 1)
)
self.layer4 = nn.Sequential(
ResNetDownBlock(256, 512, [2, 1]),
ResNetBasicBlock(512, 512, 1)
)
self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
self.fc = nn.Linear(512, 10)

def forward(self, x):
out = self.conv1(x)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = self.avgpool(out)
out = out.reshape(x.shape[0], -1)
out = self.fc(out)
return out


model_resnet18 = ResNet18()
print(model_resnet18)
data = torch.randn(32, 3, 224, 224)
result = model_resnet18(data)
print(result.shape) # torch.Size([32, 10])

3.3 训练模型

"""
<===== 3.3 训练模型 ======>
PyTorch 训练模型主要包括加载数据集、定义模型、定义损失函数和优化器、反向传播、参数更新等主要步骤:
1) 加载和预处理数据集:利用 torch.utils.data.DataLoader 可以方便地加载和迭代数据集。使用 torchvision.transforms 对数据进行预处理,如归一化、数据增强等。
2) 定义损失函数。可以通过自定义方法或者使用 PyTorch 内置的损失函数定义损失函数,如回归使用 nn.MSELoss(),分类使用 nn.BCELoss。
3) 定义优化方法,PyTorch 常用的优化方法都封装在 torch.optim 中,设计很灵活,可以拓展为自定义的优化方法。所有的优化方法都是继承了基类 optim.Optimizer,并实现了自己的优化步骤。
4) 循环训练模型。
设置为训练模式: model.train(),启用 BatchNorm 和 Dropout。
梯度清零: optimizer.zero_grad(),清除上一步的残余更新参数值。
求损失值: pred = model(x), loss = loss_func(pred, true)
自动求导,实现梯度的反向传播: loss.backward
更新参数: optimizer.step()
5) 循环测试或验证模型:
设置为测试或验证模式: model.eval(),调用 model.eval() 会把所有的 training 属性设置为 False;
在不跟踪梯度的模式下计算损失值、预测值等: with torch.no_grad()。
6) 可视化结果。

tips: model.train() 和 model.eval()
如果模型中有批归一化 (Batch Normalization, BH) 和 dropout 层,需要在训练时添加 model.train(),在测试时添加 model.eval()。
其中 model.train() 是保证 BN 层用每一批数据的均值和方差,而 model.eval() 是保证 BN 层用全部训练数据的均值和方差。
而对于 dropout 层,model.train() 是随机选取一部分网络链接来训练参数,而 model.eval() 是利用全部网络链接来进行前向传播。
"""

3.4 实现神经网络实例

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.datasets import mnist # 导入 PyTorch 自带的 MNIST 数据集
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

# <===== 3.4 实现神经网络实例 =====>
"""
完成 MNIST 数据分类:
1. 利用 PyTorch 内置函数 mnist 下载数据
2. 利用 torchvision 对数据预处理,调用 torch.utils 建立一个数据迭代器
3. 可视化原数据
4. 利用 nn 工具箱构建神经网络模型
5. 实例化模型,定义损失函数和优化器
6. 训练模型
7. 可视化结果
"""

# 定义一些超参数
train_batch_size = 64
test_batch_size = 128
num_epoches = 50
lr = 1e-2
momentum = 0.5

# 下载数据并对数据进行预处理
# 定义预处理函数,transforms.Compose 可以把一些转换函数组合在一起
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5])
])
# 下载数据集并对数据预处理
train_dataset = mnist.MNIST('./data/', train=True, transform=transform, download=False)
test_dataset = mnist.MNIST('./data/', train=False, transform=transform, download=False)
# 得到一个生成器
train_loader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)

# 可视化原数据
examples = enumerate(test_loader)
batch_idx, (example_data, example_targets) = next(examples)
fig = plt.figure()
for i in range(6):
plt.subplot(2, 3, i + 1)
plt.tight_layout()
plt.imshow(example_data[i][0], cmap='gray', interpolation='none')
plt.title("Ground Truth: {}".format(example_targets[i]))
plt.xticks([])
plt.yticks([])
plt.show()


# 构建模型
class Net(nn.Module):
def __init__(self, in_dim, n_hidden1, n_hidden2, out_dim):
super(Net, self).__init__()
self.flatten = nn.Flatten()
self.layer1 = nn.Sequential(
nn.Linear(in_dim, n_hidden1),
nn.BatchNorm1d(n_hidden1),
)
self.layer2 = nn.Sequential(
nn.Linear(n_hidden1, n_hidden2),
nn.BatchNorm1d(n_hidden2),
)
self.out = nn.Linear(n_hidden2, out_dim)

def forward(self, x):
x = self.flatten(x)
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
x = F.softmax(self.out(x), dim=1)
return x


# 实例化网络
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Net(28 * 28, 300, 100, 10).to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)

# 训练模型
losses = []
accuracy = []
eval_losses = []
eval_accuracy = []

for epoch in range(num_epoches):
train_loss, train_acc = 0, 0
model.train()
# 动态修改参数学习率
if epoch % 5 == 0:
optimizer.param_groups[0]['lr'] *= 0.9
print("学习率: {:.6f}".format(optimizer.param_groups[0]['lr']))
for img, label in train_loader:
img, label = img.to(device), label.to(device)
# 前向传播
out = model(img) # out.shape = (batch_size, 10)
loss = criterion(out, label)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 记录误差
train_loss += loss.item() # .item() 将单元素张量转换为Python标量(float或int)
# 计算分类准确率
_, pred = out.max(dim=1) # 返回每行最大值的值和索引,pred.shape = (batch_size,),索引就是对应的数字
num_correct = (pred == label).sum().item() # 预测正确的数量
acc = num_correct / img.shape[0] # img.shape[0] = batch_size
train_acc += acc # 累计每个 batch 的准确率

losses.append(train_loss / len(train_loader))
accuracy.append(train_acc / len(train_loader))

# 在测试集上检验效果
eval_loss = 0
eval_acc = 0
model.eval() # 将模型改为预测模式
for img, label in test_loader:
img, label = img.to(device), label.to(device)
out = model(img)
loss = criterion(out, label)
eval_loss += loss.item()
_, pred = out.max(dim=1)
num_correct = (pred == label).sum().item()
acc = num_correct / img.shape[0]
eval_acc += acc
eval_losses.append(eval_loss / len(test_loader))
eval_accuracy.append(eval_acc / len(test_loader))
print('epoch: {}, Train Loss: {:.4f}, Train Acc: {:.4f}, Test Loss: {:.4f}, Test Acc: {:.4f}'
.format(epoch, train_loss/len(train_loader), train_acc/len(train_loader), eval_loss/len(test_loader), eval_acc/len(test_loader)))


# 绘图
plt.title('train loss')
plt.plot(np.arange(len(losses)) + 1, losses)
plt.legend(['Train Loss'], loc='upper right')
plt.show()