本博客主要使用PyTorch先实现一个简单的CNN对CIFAR-10数据集对图片物体进行分类操作,然后实现一个ResNet同样也是应用于CIFAR-10数据集对物体进行分类操作。
在开始之前我们先介绍一下本次实验所使用的数据集CIFAR-10
CIFAR-10
CIFAR-10 是由 Hinton 的学生 Alex Krizhevsky 和 Ilya Sutskever 整理的一个用于识别普适物体的小型数据集。一共包含 10 个类别的 RGB 彩色图 片:飞机( a叩lane )、汽车( automobile )、鸟类( bird )、猫( cat )、鹿( deer )、狗( dog )、蛙类( frog )、马( horse )、船( ship )和卡车( truck )。图片的尺寸为 32×32 ,数据集中一共有 50000 张训练图片和 10000 张测试图片(每一类物体有6000张照片)。 CIFAR-10 的图片样例如图所示。
与 MNIST 数据集中目比, CIFAR-10 具有以下不同点:
• CIFAR-10 是 3 通道的彩色 RGB 图像,而 MNIST 是灰度图像。
• CIFAR-10 的图片尺寸为 32×32, 而 MNIST 的图片尺寸为 28×28,比 MNIST 稍大。
• 相比于手写字符, CIFAR-10 含有的是现实世界中真实的物体,不仅噪声很大,而且物体的比例、 特征都不尽相同,这为识别带来很大困难。 直接的线性模型如 Softmax 在 CIFAR-10 上表现得很差。
下面这幅图就是列举了10各类,每一类展示了随机的10张图片:
实现简单的CNN
加载数据集
步骤分为两步,首先是导入数据集(这一步也包含了数据的增强,比如旋转切割之类的),然后就是使用Dataloader
导入数据。对于训练集和测试集都要导入数据,但是要记住,有个bool类型的参数要设置的不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 batchsz = 32 cifar_train =datasets.CIFAR10('cifar_data' ,True , transform=transforms.Compose([ transforms.Resize([32 ,32 ]), transforms.ToTensor() ]), download=True ) cifar_train = DataLoader(cifar_train,batch_size=batchsz,shuffle=True ) cifar_test =datasets.CIFAR10('cifar_data' ,True , transform=transforms.Compose([ transforms.Resize([32 ,32 ]), transforms.ToTensor() ]), download=True ) cifar_test = DataLoader(cifar_test,batch_size=batchsz,shuffle=True )
然后我们可以使用以下两行代码来简单测试一下我们的数据是否导入成功了:
1 2 x,label = iter (cifar_train).next () print ('x:' ,x.shape)
实现LeNet-5网络结构
根据我们前面所学的理论知识,我们知道LeNet-5主要分为两个部分,卷积部分和全连接部分,中间有一个Flatten操作,我们可以采用如下这种方式对网络结构进行实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class LeNet5 (nn.Module): ''' for CIFAR10 dataset ''' def __init__ (self ): super (LeNet5,self).__init__() self.conv_unit = nn.Sequential( nn.Conv2d(3 ,6 ,kernel_size=5 , stride=1 , padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), nn.Conv2d(6 ,16 ,kernel_size=5 ,stride=1 ,padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), ) self.fc_unit=nn.Sequential( nn.Linear(16 *5 *5 ,120 ), nn.ReLU(inplace=True ), nn.Linear(120 ,84 ), nn.ReLU(inplace=True ), nn.Linear(84 ,10 ) ) tmp = torch.randn(2 ,3 ,32 ,32 ) out = self.conv_unit(tmp) print ('conv out' ,out.shape) def forward (self,x ): ''' :param input : [batchsz,3,32,32] :return logits ''' batchsz = x.size(0 ) x = self.conv_unit(x) x = x.view(batchsz,16 *5 *5 ) logits = self.fc_unit(x) return logits
可以发现上面我们使用了两个Sequential
然后是把他们在forward
中连起来的。或许你可能觉得这样写有一点麻烦,那么我们也可以把Flatten操作继承一个nn.module
类,这样他就可以被放入Sequential
中了,然后我们就有了如下的写法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import torchfrom torch import nnimport torch.nn.functional as Fclass Flatten (nn.Module): def __init__ (self ): super (Flatten,self).__init__() def forward (self,input ): return input .view(input .size(0 ),-1 ) class LeNet5 (nn.Module): ''' for CIFAR10 dataset ''' def __init__ (self ): super (LeNet5,self).__init__() self.nn_unit = nn.Sequential( nn.Conv2d(3 ,6 ,kernel_size=5 , stride=1 , padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), nn.Conv2d(6 ,16 ,kernel_size=5 ,stride=1 ,padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), Flatten(), nn.Linear(16 *5 *5 ,120 ), nn.ReLU(inplace=True ), nn.Linear(120 ,84 ), nn.ReLU(inplace=True ), nn.Linear(84 ,10 ) ) tmp = torch.randn(2 ,3 ,32 ,32 ) out = self.nn_unit(tmp) print ('nn out' ,out.shape) def forward (self,x ): ''' :param input : [batchsz,3,32,32] :return logits ''' logits = self.nn_unit(x) return logits def main (): net = LeNet5() tmp = torch.randn(2 ,3 ,32 ,32 ) out = net(tmp) print ('LeNet out' ,out.shape) if __name__ == '__main__' : main()
正如上面代码所写,当我们不确定CNN输入tensor的形状的时候,可以在main函数中写:
1 2 3 4 5 6 7 def main (): net = LeNet5() tmp = torch.randn(2 ,3 ,32 ,32 ) out = net(tmp) print ('LeNet out' ,out.shape)
或者可以在初始化网络结构的时候也可以进行测试,我们在我们定义的类下的__init__
函数中写:
1 2 3 4 5 6 tmp = torch.randn(2 ,3 ,32 ,32 ) out = self.nn_unit(tmp) print ('nn out' ,out.shape)
训练部分
和前面的全连接神经网络一样,步骤是一样的。这里多增加了如何使用GPU进行训练,以及采用了与前面不一样的CrossEntropyLoss()
作为criteon,使用Adam作为优化器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 device = torch.device('cuda' ) model = LeNet5().to(device) print (model)criteon=nn.CrossEntropyLoss() optimizer=optim.Adam(model.parameters(),lr=1e-3 ) for epoch in range (1000 ): for batchidx,(x,label) in enumerate (cifar_train): x, label =x.to(device),label.to(device) logits=model(x) loss = criteon(logits,label) optimizer.zero_grad() loss.backward() optimizer.step()
`enumerate`的作用如下,
相当于是对可以枚举的对象前面加上索引: 1 2 3 arr=['a' ,'b' ,'c' ,'d' ] for idx,item in enumerate (arr): print (idx,":" ,item)
实现Val
我们每次训练完了一个epoch都要进行Val,这样我们才知道我们的模型是否训练的合适。Val部分其实和之前的全连接神经网络是差不多的,这里不再赘述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 total_correct=0 total_num=0 for x,label in cifar_test: x,label=x.to(device),label.to(device) logits = model(x) pred = logits.argmax(dim=1 ) total_correct += torch.eq(pred,label).float ().sum ().item() total_num += x.size(0 ) acc =total_correct / total_num print ('acc' ,acc)
注意torch.eq
和torch.equal
的区别
Train+Val
整体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 for epoch in range (1000 ): for batchidx,(x,label) in enumerate (cifar_train): x, label =x.to(device),label.to(device) logits=model(x) loss = criteon(logits,label) optimizer.zero_grad() loss.backward() optimizer.step() print (epoch,loss.item()) total_correct=0 total_num=0 for x,label in cifar_test: x,label=x.to(device),label.to(device) logits = model(x) pred = logits.argmax(dim=1 ) total_correct += torch.eq(pred,label).float ().sum ().item() total_num += x.size(0 ) acc =total_correct / total_num print ('acc' ,acc)
其他细节
Val代码优化
因为Val是做测试的,是不需要梯度信息 ,反向传播对参数进行优化的,所以我们可以把上述val部分代码包含到with torch.no_grad():
中,这段代码相当于告诉PyTorch被这段代码包含的代码是不需要梯度信息的。这么写更加的安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 with torch.no_grad(): total_correct=0 total_num=0 for x,label in cifar_test: x,label=x.to(device),label.to(device) logits = model(x) pred = logits.argmax(dim=1 ) total_correct += torch.eq(pred,label).float ().sum ().item() total_num += x.size(0 ) acc =total_correct / total_num print ('acc' ,acc)
模型模式切换
因为对于我们构造的部分网络中的特定层,这些层在训练和测试中的表现是不一样的,比如BatchNorm
层他在训练状态下和测试状态下的行为是有一些差异的,如果我们对网络中的每一层都去执行切换状态的操作是非常麻烦的,但是nn.Module
支持对自定义网络整体状态的切换,大大简化了操作。
要想达到这个目的,我们需要添加两行代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 for epoch in range (1000 ): + model.train() for batchidx,(x,label) in enumerate (cifar_train): x, label =x.to(device),label.to(device) logits=model(x) loss = criteon(logits,label) optimizer.zero_grad() loss.backward() optimizer.step() print (epoch,loss.item()) + model.eval () with torch.no_grad(): total_correct=0 total_num=0 for x,label in cifar_test: x,label=x.to(device),label.to(device) logits = model(x) pred = logits.argmax(dim=1 ) total_correct += torch.eq(pred,label).float ().sum ().item() total_num += x.size(0 ) acc =total_correct / total_num print ('acc' ,acc)
代码汇总
main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 import torch import torch.nn as nnfrom torch.utils.data import DataLoaderfrom torchvision import datasetsfrom torchvision import transformsfrom LeNet5 import LeNet5from torch import optimdef main (): batchsz = 32 cifar_train =datasets.CIFAR10('cifar_data' ,True , transform=transforms.Compose([ transforms.Resize([32 ,32 ]), transforms.ToTensor() ]), download=True ) cifar_train = DataLoader(cifar_train,batch_size=batchsz,shuffle=True ) cifar_test =datasets.CIFAR10('cifar_data' ,True , transform=transforms.Compose([ transforms.Resize([32 ,32 ]), transforms.ToTensor() ]), download=True ) cifar_test = DataLoader(cifar_test,batch_size=batchsz,shuffle=True ) x,label = iter (cifar_train).next () print ('x:' ,x.shape) device = torch.device('cuda' ) model = LeNet5().to(device) print (model) criteon=nn.CrossEntropyLoss() optimizer=optim.Adam(model.parameters(),lr=1e-3 ) for epoch in range (1000 ): model.train() for batchidx,(x,label) in enumerate (cifar_train): x, label =x.to(device),label.to(device) logits=model(x) loss = criteon(logits,label) optimizer.zero_grad() loss.backward() optimizer.step() print (epoch,loss.item()) model.eval () with torch.no_grad(): total_correct=0 total_num=0 for x,label in cifar_test: x,label=x.to(device),label.to(device) logits = model(x) pred = logits.argmax(dim=1 ) total_correct += torch.eq(pred,label).float ().sum ().item() total_num += x.size(0 ) acc =total_correct / total_num print ('acc' ,acc) if __name__ == '__main__' : main()
LeNet5.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import torchfrom torch import nnimport torch.nn.functional as Fclass LeNet5 (nn.Module): ''' for CIFAR10 dataset ''' def __init__ (self ): super (LeNet5,self).__init__() self.conv_unit = nn.Sequential( nn.Conv2d(3 ,6 ,kernel_size=5 , stride=1 , padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), nn.Conv2d(6 ,16 ,kernel_size=5 ,stride=1 ,padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), ) self.fc_unit=nn.Sequential( nn.Linear(16 *5 *5 ,120 ), nn.ReLU(inplace=True ), nn.Linear(120 ,84 ), nn.ReLU(inplace=True ), nn.Linear(84 ,10 ) ) tmp = torch.randn(2 ,3 ,32 ,32 ) out = self.conv_unit(tmp) print ('conv out' ,out.shape) def forward (self,x ): ''' :param input : [batchsz,3,32,32] :return logits ''' batchsz = x.size(0 ) x = self.conv_unit(x) x = x.view(batchsz,16 *5 *5 ) logits = self.fc_unit(x) return logits def main (): net = LeNet5() tmp = torch.randn(2 ,3 ,32 ,32 ) out = net(tmp) print ('LeNet out' ,out.shape) if __name__ == '__main__' : main()
第二种方法写的LeNet5
LeNet5pro.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import torchfrom torch import nnimport torch.nn.functional as Fclass Flatten (nn.Module): def __init__ (self ): super (Flatten,self).__init__() def forward (self,input ): return input .view(input .size(0 ),-1 ) class LeNet5 (nn.Module): ''' for CIFAR10 dataset ''' def __init__ (self ): super (LeNet5,self).__init__() self.nn_unit = nn.Sequential( nn.Conv2d(3 ,6 ,kernel_size=5 , stride=1 , padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), nn.Conv2d(6 ,16 ,kernel_size=5 ,stride=1 ,padding=0 ), nn.AvgPool2d(kernel_size=2 ,stride=2 ,padding=0 ), Flatten(), nn.Linear(16 *5 *5 ,120 ), nn.ReLU(inplace=True ), nn.Linear(120 ,84 ), nn.ReLU(inplace=True ), nn.Linear(84 ,10 ) ) tmp = torch.randn(2 ,3 ,32 ,32 ) out = self.nn_unit(tmp) print ('nn out' ,out.shape) def forward (self,x ): ''' :param input : [batchsz,3,32,32] :return logits ''' logits = self.nn_unit(x) return logits def main (): net = LeNet5() tmp = torch.randn(2 ,3 ,32 ,32 ) out = net(tmp) print ('LeNet out' ,out.shape) if __name__ == '__main__' : main()
实现ResNet
本次实验以ResNet18为例,但是和论文中的ResNet18还是有些许差异的 ,因为采用的数据集不太一样。
实现残差块
本次残差块的实现和之前的有一些区别,加入了步长,这样就能在提高深度的同时缩小图片,具体代码如下。
需要注意的点是,如果输入通道和输出通道数量不一样,shortcut的路径上可能还是要再加一层卷积层,来使通道数量一样,这样才能相加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class ResBlk (nn.Module): ''' ResNet Block ''' def __init__ (self,ch_in,ch_out,stride=1 ): ''' :param ch_in :param ch_out ''' super (ResBlk,self).__init__() self.conv1 = nn.Conv2d(ch_in,ch_out,kernel_size=3 ,stride=stride,padding=1 ) self.bn1 = nn.BatchNorm2d(ch_out) self.conv2 = nn.Conv2d(ch_out,ch_out,kernel_size=3 ,stride=1 ,padding=1 ) self.bn2 = nn.BatchNorm2d(ch_out) self.extra = nn.Sequential() if ch_out!= ch_in: self.extra=nn.Sequential( nn.Conv2d(ch_in,ch_out,kernel_size=1 ,stride=stride), nn.BatchNorm2d(ch_out) ) def forward (self,x ): ''' :param x: [batchsz,ch,h,w] :return: ''' out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out = self.extra(x) + out return out
实现ResNet18网络结构
代码原理和实现LeNet-5一样,这里不再赘述
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class ResNet18 (nn.Module): def __init__ (self ): super (ResNet18, self).__init__() self.conv1 = nn.Sequential( nn.Conv2d(3 ,64 ,kernel_size=3 ,stride=3 ,padding=0 ), nn.BatchNorm2d(64 ) ) self.blk1=ResBlk(64 ,128 ,stride=2 ) self.blk2=ResBlk(128 ,256 ,stride=2 ) self.blk3=ResBlk(256 ,512 ,stride=2 ) self.blk4=ResBlk(512 ,512 ,stride=2 ) self.outlayer = nn.Linear(512 , 10 ) def forward (self,x ): ''' :param x: :return: ''' x = F.relu(self.conv1(x)) x = self.blk1(x) x = self.blk2(x) x = self.blk3(x) x = self.blk4(x) x =F.adaptive_max_pool2d(x,[1 ,1 ]) x=x.view(x.size(0 ),-1 ) x = self.outlayer(x) return x
可以使用上述代码中被注释掉的`#
测试代码`部分打出中间层tensor的形状,这样有助于掌握网络的形状。
后续代码
训练部分和Val验证和上面的LeNet-5实现代码完全一样,我们只需要把load的模型更改一下即可。首先导入ResNet
1 from ResNet import ResNet18
然后导入模型的地方稍作修改就OK了:
1 2 3 4 model = ResNet18().to(device) print (model)
代码优化
归一化&数据增强
数据预处理时加入归一化Normalize提升模型性能
当然也可以在数据处理部分加入数据增强来稍微提高模型性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 cifar_train =datasets.CIFAR10('cifar_data' ,True , transform=transforms.Compose([ transforms.Resize([32 ,32 ]), transforms.ToTensor(), transforms.Normalize(mean=[0.485 ,0.456 ,0.406 ],std=[0.229 ,0.224 ,0.225 ]) ]), download=True ) cifar_train = DataLoader(cifar_train,batch_size=batchsz,shuffle=True ) cifar_test =datasets.CIFAR10('cifar_data' ,True , transform=transforms.Compose([ transforms.Resize([32 ,32 ]), transforms.ToTensor(), transforms.Normalize(mean=[0.485 , 0.456 , 0.406 ], std=[0.229 , 0.224 , 0.225 ]) ]), download=True )
代码汇总
main.py 同上一个汇总的main,按照上面的指示改一点就行。
ResNet.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 import torchfrom torch import nnimport torch.nn.functional as Fclass ResBlk (nn.Module): ''' ResNet Block ''' def __init__ (self,ch_in,ch_out,stride=1 ): ''' :param ch_in :param ch_out ''' super (ResBlk,self).__init__() self.conv1 = nn.Conv2d(ch_in,ch_out,kernel_size=3 ,stride=stride,padding=1 ) self.bn1 = nn.BatchNorm2d(ch_out) self.conv2 = nn.Conv2d(ch_out,ch_out,kernel_size=3 ,stride=1 ,padding=1 ) self.bn2 = nn.BatchNorm2d(ch_out) self.extra = nn.Sequential() if ch_out!= ch_in: self.extra=nn.Sequential( nn.Conv2d(ch_in,ch_out,kernel_size=1 ,stride=stride), nn.BatchNorm2d(ch_out) ) def forward (self,x ): ''' :param x: [batchsz,ch,h,w] :return: ''' out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out = self.extra(x) + out return out class ResNet18 (nn.Module): def __init__ (self ): super (ResNet18, self).__init__() self.conv1 = nn.Sequential( nn.Conv2d(3 ,64 ,kernel_size=3 ,stride=3 ,padding=0 ), nn.BatchNorm2d(64 ) ) self.blk1=ResBlk(64 ,128 ,stride=2 ) self.blk2=ResBlk(128 ,256 ,stride=2 ) self.blk3=ResBlk(256 ,512 ,stride=2 ) self.blk4=ResBlk(512 ,512 ,stride=2 ) self.outlayer = nn.Linear(512 , 10 ) def forward (self,x ): ''' :param x: :return: ''' x = F.relu(self.conv1(x)) x = self.blk1(x) x = self.blk2(x) x = self.blk3(x) x = self.blk4(x) x =F.adaptive_max_pool2d(x,[1 ,1 ]) x=x.view(x.size(0 ),-1 ) x = self.outlayer(x) return x def main (): tmp = torch.randn(2 ,3 ,32 ,32 ) model=ResNet18() blk=ResBlk(3 ,128 ,stride=4 ) out = blk(tmp) out = model(tmp) print (out.shape) if __name__=='__main__' : main()