在前面的学习中,我们已经较为熟练的掌握了不同种类神经网络的原理和基本PyTorch实现,但是之前我们所使用的数据集多为PyTorch中现成的,并没有自己提取数据的这一过程,所以本篇博客旨在采用自定义数据集,帮助大家体验一次包含数据提取,数据训练,数据测试的完整的神经网络数据处理过程。
Pokemon 数据集
数据集基本信息
本次实战,采用Pokemon自定义数据集,数据集中包含5种类型的精灵
数据集中所包含的图片的数量分别是:
皮卡丘 :234
超梦: 239
杰尼龟: 223
小火龙: 238
妙蛙种子: 234
数据集划分
实战步骤
数据集加载(Load data)※
创建模型(Build model)
训练和测试(Train and Test)
迁移学习(Transfer Learning)※
数据集加载
这一部分的工作主要就是要继承父类(torch.utils.data.Dataset
)并且实现该类下的两个函数
__len__
:返回样本长度
__getitem__
:取出样本
举个例子:
1 2 3 4 5 6 7 8 9 10 class NumberDataset (Dataset ): def __init__ (self,training=True ): if training: self.samples = list (range (1 ,1001 )) else : self.samples = list (range (1001 ,1501 )) def __len__ (self ): return len (self.samples) def __getitem__ (self, idx ): return self.samples[idx]
了解了数据集PyTorch中的加载方法,下面来介绍一下本次项目需要完成的数据预处理相关操作。
数据预处理步骤(Preprocessing)
Image Resize
Data Argumentation
Normalize
ToTensor
数据集存放文件结构
1 2 3 4 5 6 pokeman ├─bulbasaur ├─charmander ├─mewtwo ├─pikachu └─squirtle
可以发现是5种精灵是分了5个文件夹,我们打开pikachu文件夹,可以看到里面的图片数据是这样的。
这样存放的好处是,使用PyTorch可以直接一行代码导入所有的数据。后面我们会进行讲解。
PyTorch实现
首先是名字类别的映射
name2label
1 2 3 4 5 6 7 8 9 10 11 12 13 def __init__ (self, root, resize, mode ): super (Pokemon, self).__init__() self.root = root self.resize = resize self.name2label={} for name in sorted (os.listdir((os.path.join(root)))): if not os.path.isdir(os.path.join(root,name)): continue self.name2label[name] = len (self.name2label.keys()) print (self.name2label)
这里因为listdir
返回的顺序不稳定,所以返回后增加一个sorted
函数对名字排序,这样就保证了返回的顺序稳定的问题。
结果如下:
1 {'bulbasaur': 0, 'charmander': 1, 'mewtwo': 2, 'pikachu': 3, 'squirtle': 4}
注意__getitem__
重写的时候要将图片里面的信息提取出来,而不是图片的路径!!!
load_csv
加载文件信息
这个函数也是写在我们自己的Class中的,目的是根据一个数据集加载一个数据集信息的csv文件(如果没有就先创建一个然后再加载),文件中存储每一个数据的存放位置以及标签信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def load_csv (self, filename ): if not os.path.exists(os.path.join(self.root, filename)): images = [] for name in self.name2label.keys(): images += glob.glob(os.path.join(self.root, name, '*.png' )) images += glob.glob(os.path.join(self.root, name, '*.jpg' )) images += glob.glob(os.path.join(self.root, name, '*.jpeg' )) print (len (images), images) random.shuffle(images) with open (os.path.join(self.root, filename), mode='w' , newline='' ) as f: writer = csv.writer(f) for img in images: name = img.split(os.sep)[-2 ] label = self.name2label[name] writer.writerow([img, label]) print ('writen into csv file:' , filename) images, labels = [], [] with open (os.path.join(self.root, filename)) as f: reader = csv.reader(f) for row in reader: img, label = row label = int (label) images.append(img) labels.append(label) assert len (images) == len (labels) return images, labels
glob
是python自己带的一个文件操作相关模块,用它可以查找符合自己目的的文件.该方法返回所有匹配的文件路径列表 (list);该方法需要一个参数用来指定匹配的路径字符串(字符串可以为绝对路径也可以为相对路径),其返回的文件名只包括当前目录里的文件名,不包括子文件夹里的文件。
划分数据集
在__init__
中load_csv
后,我们就可以进行数据集裁剪了。
1 2 3 4 5 6 7 8 9 10 11 12 self.images, self.labels = self.load_csv('images.csv' ) if mode=='train' : self.images = self.images[:int (0.6 *len (self.images))] self.labels = self.labels[:int (0.6 *len (self.labels))] elif mode=='val' : self.images = self.images[int (0.6 *len (self.images)):int (0.8 *len (self.images))] self.labels = self.labels[int (0.6 *len (self.labels)):int (0.8 *len (self.labels))] else : self.images = self.images[int (0.8 *len (self.images)):] self.labels = self.labels[int (0.8 *len (self.labels)):]
其实我更推荐的方法是先将数据集全部读出来,然后再另写一个函数,使用PyTorch提供的subset
方法对数据集进行划分。详情可以见PyTorch技巧中Kfold这一个模块。
数据增强
我们是在取数据的时候对数据进行数据增强操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def __getitem__ (self, idx ): img, label = self.images[idx], self.labels[idx] tf = transforms.Compose([ lambda x:Image.open (x).convert('RGB' ), transforms.Resize((int (self.resize*1.25 ), int (self.resize*1.25 ))), transforms.RandomRotation(15 ), transforms.CenterCrop(self.resize), transforms.ToTensor(), transforms.Normalize(mean=[0.485 , 0.456 , 0.406 ], std=[0.229 , 0.224 , 0.225 ]) ]) img = tf(img) label = torch.tensor(label) return img, label
Class部分代码汇总
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 class Pokemon (Dataset ): def __init__ (self, root, resize, mode ): super (Pokemon, self).__init__() self.root = root self.resize = resize self.name2label = {} for name in sorted (os.listdir(os.path.join(root))): if not os.path.isdir(os.path.join(root, name)): continue self.name2label[name] = len (self.name2label.keys()) self.images, self.labels = self.load_csv('images.csv' ) if mode=='train' : self.images = self.images[:int (0.6 *len (self.images))] self.labels = self.labels[:int (0.6 *len (self.labels))] elif mode=='val' : self.images = self.images[int (0.6 *len (self.images)):int (0.8 *len (self.images))] self.labels = self.labels[int (0.6 *len (self.labels)):int (0.8 *len (self.labels))] else : self.images = self.images[int (0.8 *len (self.images)):] self.labels = self.labels[int (0.8 *len (self.labels)):] def load_csv (self, filename ): if not os.path.exists(os.path.join(self.root, filename)): images = [] for name in self.name2label.keys(): images += glob.glob(os.path.join(self.root, name, '*.png' )) images += glob.glob(os.path.join(self.root, name, '*.jpg' )) images += glob.glob(os.path.join(self.root, name, '*.jpeg' )) print (len (images), images) random.shuffle(images) with open (os.path.join(self.root, filename), mode='w' , newline='' ) as f: writer = csv.writer(f) for img in images: name = img.split(os.sep)[-2 ] label = self.name2label[name] writer.writerow([img, label]) print ('writen into csv file:' , filename) images, labels = [], [] with open (os.path.join(self.root, filename)) as f: reader = csv.reader(f) for row in reader: img, label = row label = int (label) images.append(img) labels.append(label) assert len (images) == len (labels) return images, labels def __len__ (self ): return len (self.images) def denormalize (self, x_hat ): mean = [0.485 , 0.456 , 0.406 ] std = [0.229 , 0.224 , 0.225 ] mean = torch.tensor(mean).unsqueeze(1 ).unsqueeze(1 ) std = torch.tensor(std).unsqueeze(1 ).unsqueeze(1 ) x = x_hat * std + mean return x def __getitem__ (self, idx ): img, label = self.images[idx], self.labels[idx] tf = transforms.Compose([ lambda x:Image.open (x).convert('RGB' ), transforms.Resize((int (self.resize*1.25 ), int (self.resize*1.25 ))), transforms.RandomRotation(15 ), transforms.CenterCrop(self.resize), transforms.ToTensor(), transforms.Normalize(mean=[0.485 , 0.456 , 0.406 ], std=[0.229 , 0.224 , 0.225 ]) ]) img = tf(img) label = torch.tensor(label) return img, label
Visdom验证自定义数据集加载正确性
我们在主函数中写下如下的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import visdomimport timeimport torchvisionviz = visdom.Visdom() db = Pokemon('pokemon' , 64 , 'train' ) x,y = next (iter (db)) print ('sample:' , x.shape, y.shape, y)viz.image(db.denormalize(x), win='sample_x' , opts=dict (title='sample_x' )) loader = DataLoader(db, batch_size=32 , shuffle=True , num_workers=8 ) for x,y in loader: viz.images(db.denormalize(x), nrow=8 , win='batch' , opts=dict (title='batch' )) viz.text(str (y.numpy()), win='label' , opts=dict (title='batch-y' )) time.sleep(10 )
使用Visdom之前一定要在命令行启动Visdom本地服务器,输入以下命令
运行效果是每过10sload32张图出来,一排8个,一共4排。
自定义数据集的部分就完成了,下面就进入到建立模型的阶段了。
建立模型
这一阶段在之前的PyTorch CNN实战部分已经有过详细的讲解了,这里就不赘述了。
本实验使用的ResNet18模型代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class ResNet18 (nn.Module): def __init__ (self, num_class ): super (ResNet18, self).__init__() self.conv1 = nn.Sequential( nn.Conv2d(3 , 16 , kernel_size=3 , stride=3 , padding=0 ), nn.BatchNorm2d(16 ) ) self.blk1 = ResBlk(16 , 32 , stride=3 ) self.blk2 = ResBlk(32 , 64 , stride=3 ) self.blk3 = ResBlk(64 , 128 , stride=2 ) self.blk4 = ResBlk(128 , 256 , stride=2 ) self.outlayer = nn.Linear(256 *3 *3 , num_class) def forward (self, x ): """ :param x: :return: """ x = F.relu(self.conv1(x)) x = self.blk1(x) x = self.blk2(x) x = self.blk3(x) x = self.blk4(x) x = x.view(x.size(0 ), -1 ) x = self.outlayer(x) return x
训练,验证,测试
严格Train,Val,Test模板※
1 2 3 4 5 6 7 8 9 10 11 12 13 for epoch in range (epochs): train(train_db) if epoch%10 ==0 : val_acc = evaluate(val_db) if val_acc is the best: save_ckpt() if out_of_patience(): break load_ckpt test_acc = evaluate(test_db)
以后训练按照以上模板进行书写即可。
本实验中所使用的代码如下所示:
evaluate部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def evalute (model, loader ): model.eval () correct = 0 total = len (loader.dataset) for x,y in loader: x,y = x.to(device), y.to(device) with torch.no_grad(): logits = model(x) pred = logits.argmax(dim=1 ) correct += torch.eq(pred, y).sum ().float ().item() return correct / total
main部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def main (): model = ResNet18(5 ).to(device) optimizer = optim.Adam(model.parameters(), lr=lr) criteon = nn.CrossEntropyLoss() best_acc, best_epoch = 0 , 0 global_step = 0 viz.line([0 ], [-1 ], win='loss' , opts=dict (title='loss' )) viz.line([0 ], [-1 ], win='val_acc' , opts=dict (title='val_acc' )) for epoch in range (epochs): for step, (x,y) in enumerate (train_loader): x, y = x.to(device), y.to(device) model.train() logits = model(x) loss = criteon(logits, y) optimizer.zero_grad() loss.backward() optimizer.step() viz.line([loss.item()], [global_step], win='loss' , update='append' ) global_step += 1 if epoch % 1 == 0 : val_acc = evalute(model, val_loader) if val_acc> best_acc: best_epoch = epoch best_acc = val_acc torch.save(model.state_dict(), 'best.mdl' ) viz.line([val_acc], [global_step], win='val_acc' , update='append' ) print ('best acc:' , best_acc, 'best epoch:' , best_epoch) model.load_state_dict(torch.load('best.mdl' )) print ('loaded from ckpt!' ) test_acc = evalute(model, test_loader) print ('test acc:' , test_acc)
如果想要实时检测训练情况,那么我们可以使用visdom工具,而不是最后进行matplotlib,实时监控可以在模型出现问题时及时停下来,进行调整。(如上面的代码所示)
目前结果
按照以上步骤下来,我们的模型train下来的loss是非常小的,但是测试集,准确率并没有达到理想的准确率,这说明我们的模型发生了过拟合,发生这样的事情非常的正常,因为我们数据集的规模非常的小,而且种类也不多,对于ResNet18这种较为复杂的神经网络是不够的,很容易出现这样的问题,因此,这个时候就有必要使用迁移学习 解决过拟合的问题了。
迁移学习(Transfer Learning)
简而言之,迁移学习是一种机器学习方法,就是把为任务 A 开发的模型作为初始点,重新使用在为任务 B 开发模型的过程中。
在我们这里,就是在训练好ImageNet数据集的神经网络的基础上,提取其中训练好的网络参数,加载到要训练宝可梦数据集的网络中,然后再对宝可梦数据集进行训练。
这里我们直接使用的是torchvision
中提供好的resnet18
模型
1 from torchvision.models import resnet18
我们要做的就是将其前17层拆下来,然后最后一层接一层我们自己的全连接层进行分类。
使用children()
方法拆下网络的前17层然后传入到我们自己的model中去,然后再接Flatten操作,然后接全连接层。
代码如下所示:(只需要在初始化模型那里改变一点即可,这里就不贴代码汇总了)
1 2 3 4 5 trained_model = resnet18(pretrained=True ) model = nn.Sequential(*list (trained_model.children())[:-1 ], Flatten(), nn.Linear(512 , 5 ) ).to(device)
Flatten层代码:
1 2 3 4 5 6 7 8 class Flatten (nn.Module): def __init__ (self ): super (Flatten, self).__init__() def forward (self, x ): shape = torch.prod(torch.tensor(x.shape[1 :])).item() return x.view(-1 , shape)
结果
最终,train loss在同一水平下,验证集的准确率提高了10%左右,使用迁移学习的效果提升还是非常明显的。