概述
MNIST手写数字集和FashionMNIST是基础图像分类集合,你可以直接用它来测试你的机器学习和深度学习算法性能,而FashionMNIST是当代标准。
本文基于MNIST首先构建了一个784×256×10的全连接的网络来进行基础测试,之后是一个卷积神经网络在FashionMNIST上的实现
我的所有源码皆放在了本仓库
构建模型
from torch import nn
from torch.nn import init
import numpy as np
class FlattenLayer(nn.Module):
def __init__(self):
super(FlattenLayer, self).__init__()
def forward(self, x): # x 的形状: (batch, *, *, ...)
return x.view(x.shape[0], -1)
num_inputs, num_outputs, num_hiddens = 784, 10, 256
net = nn.Sequential(
FlattenLayer(),
nn.Linear(num_inputs, num_hiddens),
nn.ReLU(),
nn.Linear(num_hiddens, num_outputs),
)
将图像压平输入网络,以ReLU为非线性激活单元。
定义优化函数为Adam,损失函数为cross entropy
事实上MNIST的优化曲线十分简单,用SGD和Adam并没有太大区别
##定义优化函数
optimizer = torch.optim.Adam(net.parameters(), lr=0.02)
# optimizer = torch.optim.SGD([
# {'params': [weight_p for name,weight_p in net.named_parameters() if 'weight' in name], 'weight_decay':1e-5},
# {'params': [bias_p for name,bias_p in net.named_parameters() if 'bias' in name], 'weight_decay':0}
# ],
# lr=0.2)
##定义损失函数
loss = torch.nn.CrossEntropyLoss()
进行训练
num_epochs = 5
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n = 0.0, 0.0, 0
for X, y in train_iter:
y_hat = net(X)
# print(y_hat,y)
l = loss(y_hat, y).sum()
## 梯度清零
optimizer.zero_grad()
## 梯度反传
l.backward()
optimizer.step()
train_l_sum += l.item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().item()
n += y.shape[0]
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f'
% (epoch + 1, train_l_sum / n, train_acc_sum / n, test_acc))
训练结果如下
可见MNIST数据集的确是较为容易的,仅仅5epoch就可以获得0.97的test acc
比如FashionMNIST官方就对MNIST做出了这样的评价
- MNIST太简单了。
- MNIST被用烂了。
- MNIST数字识别的任务不代表现代机器学习。
MNIST适合初学者做第一个模型,比如上面,下面附上所有代码
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import time
print(torch.__version__)
print(torchvision.__version__)
# 数据处理
import os
import torch
from torch.utils import data
from PIL import Image
import numpy as np
from torchvision import transforms
transform = transforms.Compose([
transforms.ToTensor(), # 将图片转换为Tensor,归一化至[0,1]
# transforms.Normalize(mean=[.5, .5, .5], std=[.5, .5, .5]) # 标准化至[-1,1]
])
#定义自己的数据集合
class ImgSet(data.Dataset):
def __init__(self,Train : bool):
self.transforms=transform
self.imgs = []
if Train:
rootdir = r"D:DesktopAI学习笔记doworkpytorchdatamnist_train"
lables = os.listdir(rootdir)
for lable in lables:
imglis = os.listdir(rootdir + "\" + lable)
self.imgs += list(map(lambda x:rootdir+"\"+lable+"\"+x,imglis))[0:-1]
else:
rootdir = r"D:DesktopAI学习笔记doworkpytorchdatamnist_test"
lables = os.listdir(rootdir)
for lable in lables:
imglis = os.listdir(rootdir + "\" + lable)
self.imgs += list(map(lambda x:rootdir+"\"+lable+"\"+x,imglis))
def __getitem__(self,index):
imgfile = self.imgs[index]
pil_img = Image.open(imgfile)
# pil_img.show()
data = self.transforms(pil_img)
# print(imgfile)
# print(int(imgfile.split('\')[4]))
lable = int(imgfile.split('\')[6])
return data,lable
def __len__(self):
return len(self.imgs)
train_set = ImgSet(True)
test_set = ImgSet(False)
## 读取数据
batch_size = 256
num_workers = 0
train_iter = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=num_workers)
def evaluate_accuracy(data_iter, net,device=torch.device('cpu'),batchNormalizationAndDropout=False):
"""计算模型在数据集上的误差"""
acc_sum,n = torch.tensor([0],dtype=torch.float32,device=device),0
for X,y in data_iter:
# If device is the GPU, copy the data to the GPU.
X,y = X.to(device),y.to(device)
# 是否启用 BatchNormalization 和 Dropout
if batchNormalizationAndDropout:
net.train()
else:
net.eval()
with torch.no_grad():
y = y.long()
acc_sum += torch.sum((torch.argmax(net(X), dim=1) == y)) #[[0.2 ,0.4 ,0.5 ,0.6 ,0.8] ,[ 0.1,0.2 ,0.4 ,0.3 ,0.1]] => [ 4 , 2 ]
n += y.shape[0]
return acc_sum.item()/n
## 定义模型
from torch import nn
from torch.nn import init
import numpy as np
class FlattenLayer(nn.Module):
def __init__(self):
super(FlattenLayer, self).__init__()
def forward(self, x): # x 的形状: (batch, *, *, ...)
return x.view(x.shape[0], -1)
num_inputs, num_outputs, num_hiddens = 784, 10, 256
net = nn.Sequential(
FlattenLayer(),
nn.Linear(num_inputs, num_hiddens),
nn.Dropout(),
nn.ReLU(),
nn.Linear(num_hiddens, num_outputs),
)
## 初始化参数
for params in net.parameters():
init.normal_(params, mean=0, std=0.01)
##定义优化函数
optimizer = torch.optim.Adam(net.parameters(), lr=0.02)
# optimizer = torch.optim.SGD([
# {'params': [weight_p for name,weight_p in net.named_parameters() if 'weight' in name], 'weight_decay':1e-5},
# {'params': [bias_p for name,bias_p in net.named_parameters() if 'bias' in name], 'weight_decay':0}
# ],
# lr=0.2)
##定义损失函数
loss = torch.nn.CrossEntropyLoss()
num_epochs = 5
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n = 0.0, 0.0, 0
for X, y in train_iter:
y_hat = net(X)
# print(y_hat,y)
l = loss(y_hat, y).sum()
## 梯度清零
optimizer.zero_grad()
## 梯度反传
l.backward()
optimizer.step()
train_l_sum += l.item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().item()
n += y.shape[0]
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f'
% (epoch + 1, train_l_sum / n, train_acc_sum / n, test_acc))
下面实现的是对于FashionMNIST的卷积神经网络
构建模型
def nin_block(in_channels, out_channels, kernel_size, stride, padding):
blk = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=1),
nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=1),
nn.ReLU())
return blk
net = nn.Sequential(
nin_block(1, 96, kernel_size=11, stride=4, padding=0),
nn.MaxPool2d(kernel_size=3, stride=2),
nin_block(96, 256, kernel_size=5, stride=1, padding=2),
nn.Dropout(0.5),
# 标签类别数是10
nin_block(256, 10, kernel_size=3, stride=1, padding=1),
utils.GlobalAvgPool2d(),
# 将四维的输出转成二维的输出,其形状为(批量大小, 10)
utils.FlattenLayer())
损失函数与优化函数与上一个模型相同
下面是所有代码
import torch
from torch import nn
from torchvision import datasets
from FastAI import utils
from torchvision import transforms
## 数据集
train_set = datasets.MNIST('d:/Desktop/data',train=True,transform=transforms.ToTensor(),download=True)
test_set = datasets.MNIST('d:/Desktop/data',train=False,transform=transforms.ToTensor(),download=True)
## 模型
def nin_block(in_channels, out_channels, kernel_size, stride, padding):
blk = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=1),
nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=1),
nn.ReLU())
return blk
net = nn.Sequential(
nin_block(1, 96, kernel_size=11, stride=4, padding=0),
nn.MaxPool2d(kernel_size=3, stride=2),
nin_block(96, 256, kernel_size=5, stride=1, padding=2),
nn.Dropout(0.5),
# 标签类别数是10
nin_block(256, 10, kernel_size=3, stride=1, padding=1),
utils.GlobalAvgPool2d(),
# 将四维的输出转成二维的输出,其形状为(批量大小, 10)
utils.FlattenLayer())
##定义损失函数
loss = torch.nn.CrossEntropyLoss()
#定义优化函数
lr = 0.02
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
utils.train_demo(net,train_set,test_set,optimizer,loss,5,256)
另外附上我的工具包
utils
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from .visual import train_with_visualization
## 工具
def try_gpu():
"""运行设备确认,如有GPU就运行GPU"""
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
return device
def evaluate_accuracy(data_iter, net,device=try_gpu(),batchNormalizationAndDropout=False):
"""计算模型在测试集上的误差"""
acc_sum,n = torch.tensor([0],dtype=torch.float32,device=device),0
for X,y in data_iter:
# If device is the GPU, copy the data to the GPU.
X,y = X.to(device),y.to(device)
# 是否启用 BatchNormalization 和 Dropout
if batchNormalizationAndDropout:
net.train()
else:
net.eval()
with torch.no_grad():
y = y.long()
acc_sum += torch.sum((torch.argmax(net(X), dim=1) == y)) #[[0.2 ,0.4 ,0.5 ,0.6 ,0.8] ,[ 0.1,0.2 ,0.4 ,0.3 ,0.1]] => [ 4 , 2 ]
n += y.shape[0]
return acc_sum.item()/n
def show_output_shape(net,batch_size,data_set):
"""rand模拟输入网络查看每层输出"""
X = torch.rand(*tuple([batch_size]+list(data_set[0][0].shape)))
for name, blk in net.named_children():
X = blk(X)
print(name, 'output shape: ', X.shape)
## 网络
class FlattenLayer(nn.Module):
"""将输入张量压平"""
def __init__(self):
super(FlattenLayer, self).__init__()
def forward(self, x): # x 的形状: (batch, *, *, ...)
return x.view(x.shape[0], -1)
class GlobalAvgPool2d(nn.Module):
"""全局平均池化层可通过将池化窗口形状设置成输入的高和宽实现"""
def __init__(self):
super(GlobalAvgPool2d, self).__init__()
def forward(self, x):
return F.avg_pool2d(x, kernel_size=x.size()[2:])
## 优化函数
def SGD_L2(net,lr,weight_decay):
"""带L2正则的SGD,去除对bias的权重衰减"""
torch.optim.SGD([
{'params': [weight_p for name,weight_p in net.named_parameters() if 'weight' in name], 'weight_decay':weight_decay},
{'params': [bias_p for name,bias_p in net.named_parameters() if 'bias' in name], 'weight_decay':0}
],
lr=lr)
### 经典论文中的网络块
#### modernCNN中的块
class Inception(nn.Module):
"""GoogLeNet中的Inception块"""
# c1 - c4为每条线路里的层的输出通道数
def __init__(self, in_c, c1, c2, c3, c4):
super(Inception, self).__init__()
# 线路1,单1 x 1卷积层
self.p1_1 = nn.Conv2d(in_c, c1, kernel_size=1)
# 线路2,1 x 1卷积层后接3 x 3卷积层
self.p2_1 = nn.Conv2d(in_c, c2[0], kernel_size=1)
self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
# 线路3,1 x 1卷积层后接5 x 5卷积层
self.p3_1 = nn.Conv2d(in_c, c3[0], kernel_size=1)
self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
# 线路4,3 x 3最大池化层后接1 x 1卷积层
self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
self.p4_2 = nn.Conv2d(in_c, c4, kernel_size=1)
def forward(self, x):
p1 = F.relu(self.p1_1(x))
p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
p4 = F.relu(self.p4_2(self.p4_1(x)))
return torch.cat((p1, p2, p3, p4), dim=1) # 在通道维上连结输出
#### 残差网络(ResNet)
class Residual(nn.Module):
"""残差块"""
#可以设定输出通道数、是否使用额外的1x1卷积层来修改通道数以及卷积层的步幅。
def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
super(Residual, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
return F.relu(Y + X)
### 稠密连接网络(DenseNet)
def conv_block(in_channels, out_channels):
blk = nn.Sequential(nn.BatchNorm2d(in_channels),
nn.ReLU(),
nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
return blk
class DenseBlock(nn.Module):
"""稠密块"""
def __init__(self, num_convs, in_channels, out_channels):
super(DenseBlock, self).__init__()
net = []
for i in range(num_convs):
in_c = in_channels + i * out_channels
net.append(conv_block(in_c, out_channels))
self.net = nn.ModuleList(net)
self.out_channels = in_channels + num_convs * out_channels # 计算输出通道数
def forward(self, X):
for blk in self.net:
Y = blk(X)
X = torch.cat((X, Y), dim=1) # 在通道维上将输入和输出连结
return X
def transition_block(in_channels, out_channels):
"""过度层"""
blk = nn.Sequential(
nn.BatchNorm2d(in_channels),
nn.ReLU(),
nn.Conv2d(in_channels, out_channels, kernel_size=1),
nn.AvgPool2d(kernel_size=2, stride=2))
return blk
## 训练demo
@train_with_visualization(show_every_epoch=1,logpath="logs")
def train_demo(net, train_set, test_set, optimizer, loss_function, num_epochs, batch_size, device=try_gpu(),num_workers=0):
#读取数据
train_iter = DataLoader(train_set, batch_size=batch_size, shuffle=True,num_workers=num_workers)
test_iter = DataLoader(test_set, batch_size=batch_size, shuffle=False,num_workers=num_workers)
train_l_sum, train_acc_sum, n = 0.0, 0.0, 0
for epoch in range(num_epochs):
for X,y in train_iter:
X=X.to(device)
y=y.to(device)
#forward pass
y_hat=net(X)
loss=loss_function(y_hat,y)
#Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_l_sum += loss.item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().item()
n += y.shape[0]
test_acc = evaluate_accuracy(test_iter, net)
# 返回给装饰器可视化训练过程
yield epoch+1, train_l_sum / n, train_acc_sum / n, test_acc
visual
import time
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
import os
from functools import wraps
def to_np(x):
return x.data.cpu().numpy()
def train_with_visualization(show_every_epoch=5,logpath="logs"):
"""装饰器自动将神经网络训练过程可视化
show_every_epoch -- 每隔多少epoch更新一次数据 默认5
logpath -- tensorboard日志目录 默认logs
"""
def func_wrapper(func):
@wraps(func)
def new_func(*args, **kwargs):
write = SummaryWriter(logpath)
net, _, _, _, _, num_epochs, _ = args
pbar = tqdm(total=num_epochs)
for outputs in func(*args, **kwargs):
epoch, loss, train_acc, test_acc = outputs
if epoch % show_every_epoch == 0:
pbar.set_description(f'|loss {loss:.3f}|train_acc {train_acc:.3f}|test_acc {test_acc:.3f}|')
pbar.update(show_every_epoch)
#============ TensorBoard logging ============#
# (1) Log the scalar Values
info = {
'loss': loss,
'train acc': train_acc,
'test acc': test_acc
}
write.add_scalars('loss&acc',info,epoch)
# (2)Log values and gradients of the parameters
for tag,value in net.named_parameters():
tag = tag.replace('.','/')
write.add_histogram(tag,to_np(value),epoch)
write.add_histogram(tag+'/grad',to_np(value.grad),epoch)
pbar.close()
write.close()
#open tensorboard
os.system(f"start tensorboard --logdir={logpath} --port=6007")
return new_func
return func_wrapper
另外推荐此开源项目,使基础的网络实现更容易调包
https://github.com/PyTorchLightning/pytorch-lightning.git
类似于tensorflow的keras
最后
以上就是无聊吐司为你收集整理的机器学习入门之FashionMNIST图像分类的全部内容,希望文章能够帮你解决机器学习入门之FashionMNIST图像分类所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复