在前面的学习中,我们已经较为熟练的掌握了不同种类神经网络的原理和基本PyTorch实现,但是之前我们所使用的数据集多为PyTorch中现成的,并没有自己提取数据的这一过程,所以本篇博客旨在采用自定义数据集,帮助大家体验一次包含数据提取,数据训练,数据测试的完整的神经网络数据处理过程。

Pokemon 数据集

数据集基本信息

本次实战,采用Pokemon自定义数据集,数据集中包含5种类型的精灵

5种精灵

数据集中所包含的图片的数量分别是:

  • 皮卡丘 :234
  • 超梦: 239
  • 杰尼龟: 223
  • 小火龙: 238
  • 妙蛙种子: 234

数据集划分

数据集划分

实战步骤

  • 数据集加载(Load data)※
  • 创建模型(Build model)
  • 训练和测试(Train and Test)
  • 迁移学习(Transfer Learning)※

数据集加载

这一部分的工作主要就是要继承父类(torch.utils.data.Dataset)并且实现该类下的两个函数

  • __len__:返回样本长度
  • __getitem__:取出样本

举个例子:

1
2
3
4
5
6
7
8
9
10
class NumberDataset(Dataset):
def __init__(self,training=True):
if training:
self.samples = list(range(1,1001))
else:
self.samples = list(range(1001,1501))
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
return self.samples[idx]

了解了数据集PyTorch中的加载方法,下面来介绍一下本次项目需要完成的数据预处理相关操作。

数据预处理步骤(Preprocessing)

  • Image Resize
    • 224*224 for ResNet 18
  • Data Argumentation
    • Rotate
    • Crop
  • Normalize
    • Mean,std
  • ToTensor

数据集存放文件结构

1
2
3
4
5
6
pokeman
├─bulbasaur
├─charmander
├─mewtwo
├─pikachu
└─squirtle

可以发现是5种精灵是分了5个文件夹,我们打开pikachu文件夹,可以看到里面的图片数据是这样的。

pikachu

这样存放的好处是,使用PyTorch可以直接一行代码导入所有的数据。后面我们会进行讲解。

PyTorch实现

首先是名字类别的映射

name2label

1
2
3
4
5
6
7
8
9
10
11
12
13
def __init__(self, root, resize, mode):
super(Pokemon, self).__init__() # 调用父类初始化函数

self.root = root
self.resize = resize
# 创建类别和标签映射表
self.name2label={}
for name in sorted(os.listdir((os.path.join(root)))):
if not os.path.isdir(os.path.join(root,name)):
continue
self.name2label[name] = len(self.name2label.keys())
# 调试代码
print(self.name2label)

这里因为listdir返回的顺序不稳定,所以返回后增加一个sorted函数对名字排序,这样就保证了返回的顺序稳定的问题。

结果如下:

1
{'bulbasaur': 0, 'charmander': 1, 'mewtwo': 2, 'pikachu': 3, 'squirtle': 4}

注意__getitem__重写的时候要将图片里面的信息提取出来,而不是图片的路径!!!

load_csv加载文件信息

这个函数也是写在我们自己的Class中的,目的是根据一个数据集加载一个数据集信息的csv文件(如果没有就先创建一个然后再加载),文件中存储每一个数据的存放位置以及标签信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def load_csv(self, filename):
# 没有csv数据集信息文件,生成一个
if not os.path.exists(os.path.join(self.root, filename)):
images = []
for name in self.name2label.keys():
# 'pokemon\\mewtwo\\00001.png
images += glob.glob(os.path.join(self.root, name, '*.png'))
images += glob.glob(os.path.join(self.root, name, '*.jpg'))
images += glob.glob(os.path.join(self.root, name, '*.jpeg'))

# 1167, 'pokemon\\bulbasaur\\00000000.png'
print(len(images), images)

random.shuffle(images)
with open(os.path.join(self.root, filename), mode='w', newline='') as f:
writer = csv.writer(f)
for img in images: # 'pokemon\\bulbasaur\\00000000.png'
name = img.split(os.sep)[-2]
label = self.name2label[name]
# 'pokemon\\bulbasaur\\00000000.png', 0
writer.writerow([img, label])
print('writen into csv file:', filename)

# 如果存在csv数据集信息文件,则读取
# read from csv file
images, labels = [], []
with open(os.path.join(self.root, filename)) as f:
reader = csv.reader(f)
for row in reader:
# 'pokemon\\bulbasaur\\00000000.png', 0
img, label = row
label = int(label)

images.append(img)
labels.append(label)

assert len(images) == len(labels)

return images, labels

glob是python自己带的一个文件操作相关模块,用它可以查找符合自己目的的文件.该方法返回所有匹配的文件路径列表(list);该方法需要一个参数用来指定匹配的路径字符串(字符串可以为绝对路径也可以为相对路径),其返回的文件名只包括当前目录里的文件名,不包括子文件夹里的文件。

划分数据集

__init__load_csv后,我们就可以进行数据集裁剪了。

1
2
3
4
5
6
7
8
9
10
11
12
# image, label
self.images, self.labels = self.load_csv('images.csv')

if mode=='train': # 60%
self.images = self.images[:int(0.6*len(self.images))]
self.labels = self.labels[:int(0.6*len(self.labels))]
elif mode=='val': # 20% = 60%->80%
self.images = self.images[int(0.6*len(self.images)):int(0.8*len(self.images))]
self.labels = self.labels[int(0.6*len(self.labels)):int(0.8*len(self.labels))]
else: # 20% = 80%->100%
self.images = self.images[int(0.8*len(self.images)):]
self.labels = self.labels[int(0.8*len(self.labels)):]

其实我更推荐的方法是先将数据集全部读出来,然后再另写一个函数,使用PyTorch提供的subset方法对数据集进行划分。详情可以见PyTorch技巧中Kfold这一个模块。

数据增强

我们是在取数据的时候对数据进行数据增强操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def __getitem__(self, idx):
# idx~[0~len(images)]
# self.images, self.labels
# img: 'pokemon\\bulbasaur\\00000000.png'
# label: 0
img, label = self.images[idx], self.labels[idx]

tf = transforms.Compose([
lambda x:Image.open(x).convert('RGB'), # string path= > image data
transforms.Resize((int(self.resize*1.25), int(self.resize*1.25))),
transforms.RandomRotation(15),
transforms.CenterCrop(self.resize),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])

img = tf(img)
label = torch.tensor(label)


return img, label

Class部分代码汇总

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class Pokemon(Dataset):

def __init__(self, root, resize, mode):
super(Pokemon, self).__init__()

self.root = root
self.resize = resize

self.name2label = {} # "sq...":0
for name in sorted(os.listdir(os.path.join(root))):
if not os.path.isdir(os.path.join(root, name)):
continue

self.name2label[name] = len(self.name2label.keys())

# print(self.name2label)

# image, label
self.images, self.labels = self.load_csv('images.csv')

if mode=='train': # 60%
self.images = self.images[:int(0.6*len(self.images))]
self.labels = self.labels[:int(0.6*len(self.labels))]
elif mode=='val': # 20% = 60%->80%
self.images = self.images[int(0.6*len(self.images)):int(0.8*len(self.images))]
self.labels = self.labels[int(0.6*len(self.labels)):int(0.8*len(self.labels))]
else: # 20% = 80%->100%
self.images = self.images[int(0.8*len(self.images)):]
self.labels = self.labels[int(0.8*len(self.labels)):]





def load_csv(self, filename):

if not os.path.exists(os.path.join(self.root, filename)):
images = []
for name in self.name2label.keys():
# 'pokemon\\mewtwo\\00001.png
images += glob.glob(os.path.join(self.root, name, '*.png'))
images += glob.glob(os.path.join(self.root, name, '*.jpg'))
images += glob.glob(os.path.join(self.root, name, '*.jpeg'))

# 1167, 'pokemon\\bulbasaur\\00000000.png'
print(len(images), images)

random.shuffle(images)
with open(os.path.join(self.root, filename), mode='w', newline='') as f:
writer = csv.writer(f)
for img in images: # 'pokemon\\bulbasaur\\00000000.png'
name = img.split(os.sep)[-2]
label = self.name2label[name]
# 'pokemon\\bulbasaur\\00000000.png', 0
writer.writerow([img, label])
print('writen into csv file:', filename)

# read from csv file
images, labels = [], []
with open(os.path.join(self.root, filename)) as f:
reader = csv.reader(f)
for row in reader:
# 'pokemon\\bulbasaur\\00000000.png', 0
img, label = row
label = int(label)

images.append(img)
labels.append(label)

assert len(images) == len(labels)

return images, labels



def __len__(self):

return len(self.images)


def denormalize(self, x_hat):

mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# x_hat = (x-mean)/std
# x = x_hat*std = mean
# x: [c, h, w]
# mean: [3] => [3, 1, 1]
mean = torch.tensor(mean).unsqueeze(1).unsqueeze(1)
std = torch.tensor(std).unsqueeze(1).unsqueeze(1)
# print(mean.shape, std.shape)
x = x_hat * std + mean

return x


def __getitem__(self, idx):
# idx~[0~len(images)]
# self.images, self.labels
# img: 'pokemon\\bulbasaur\\00000000.png'
# label: 0
img, label = self.images[idx], self.labels[idx]

tf = transforms.Compose([
lambda x:Image.open(x).convert('RGB'), # string path= > image data
transforms.Resize((int(self.resize*1.25), int(self.resize*1.25))),
transforms.RandomRotation(15),
transforms.CenterCrop(self.resize),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])

img = tf(img)
label = torch.tensor(label)


return img, label

其中denormalize函数做的工作是反正则化

Visdom验证自定义数据集加载正确性

我们在主函数中写下如下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import  visdom
import time
import torchvision

viz = visdom.Visdom()
db = Pokemon('pokemon', 64, 'train')

x,y = next(iter(db))
print('sample:', x.shape, y.shape, y)

viz.image(db.denormalize(x), win='sample_x', opts=dict(title='sample_x'))

loader = DataLoader(db, batch_size=32, shuffle=True, num_workers=8)

for x,y in loader:
viz.images(db.denormalize(x), nrow=8, win='batch', opts=dict(title='batch'))
viz.text(str(y.numpy()), win='label', opts=dict(title='batch-y'))

time.sleep(10)

使用Visdom之前一定要在命令行启动Visdom本地服务器,输入以下命令

1
python -m visdom.server

运行效果是每过10sload32张图出来,一排8个,一共4排。

自定义数据集的部分就完成了,下面就进入到建立模型的阶段了。

建立模型

这一阶段在之前的PyTorch CNN实战部分已经有过详细的讲解了,这里就不赘述了。

本实验使用的ResNet18模型代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ResNet18(nn.Module):

def __init__(self, num_class):
super(ResNet18, self).__init__()

self.conv1 = nn.Sequential(
nn.Conv2d(3, 16, kernel_size=3, stride=3, padding=0),
nn.BatchNorm2d(16)
)
# followed 4 blocks
# [b, 16, h, w] => [b, 32, h ,w]
self.blk1 = ResBlk(16, 32, stride=3)
# [b, 32, h, w] => [b, 64, h, w]
self.blk2 = ResBlk(32, 64, stride=3)
# # [b, 64, h, w] => [b, 128, h, w]
self.blk3 = ResBlk(64, 128, stride=2)
# # [b, 128, h, w] => [b, 256, h, w]
self.blk4 = ResBlk(128, 256, stride=2)

# [b, 256, 7, 7]
self.outlayer = nn.Linear(256*3*3, num_class)

def forward(self, x):
"""
:param x:
:return:
"""
x = F.relu(self.conv1(x))

# [b, 64, h, w] => [b, 1024, h, w]
x = self.blk1(x)
x = self.blk2(x)
x = self.blk3(x)
x = self.blk4(x)

# print(x.shape)
x = x.view(x.size(0), -1)
x = self.outlayer(x)


return x

训练,验证,测试

严格Train,Val,Test模板※

1
2
3
4
5
6
7
8
9
10
11
12
13
for epoch in range(epochs):

train(train_db)

if epoch%10==0: # Val
val_acc = evaluate(val_db)

if val_acc is the best:
save_ckpt() # 保存当前网络参数
if out_of_patience():
break
load_ckpt # 加载网络参数
test_acc = evaluate(test_db) # test

以后训练按照以上模板进行书写即可。

本实验中所使用的代码如下所示:

evaluate部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def evalute(model, loader):
model.eval()

correct = 0
total = len(loader.dataset)

for x,y in loader:
x,y = x.to(device), y.to(device)
with torch.no_grad():
logits = model(x)
pred = logits.argmax(dim=1)
correct += torch.eq(pred, y).sum().float().item()

return correct / total

main部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def main():

model = ResNet18(5).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
criteon = nn.CrossEntropyLoss()


best_acc, best_epoch = 0, 0 # 保存最优模型参数
global_step = 0
viz.line([0], [-1], win='loss', opts=dict(title='loss'))
viz.line([0], [-1], win='val_acc', opts=dict(title='val_acc'))
for epoch in range(epochs):

for step, (x,y) in enumerate(train_loader):

# x: [b, 3, 224, 224], y: [b]
x, y = x.to(device), y.to(device)

model.train()
logits = model(x)
loss = criteon(logits, y)

optimizer.zero_grad()
loss.backward()
optimizer.step()

viz.line([loss.item()], [global_step], win='loss', update='append')
global_step += 1

if epoch % 1 == 0:

val_acc = evalute(model, val_loader)
if val_acc> best_acc:
best_epoch = epoch
best_acc = val_acc

torch.save(model.state_dict(), 'best.mdl') # 保存最优模型参数

viz.line([val_acc], [global_step], win='val_acc', update='append')

# 结束train后
print('best acc:', best_acc, 'best epoch:', best_epoch)
# 加载最优模型
model.load_state_dict(torch.load('best.mdl'))
print('loaded from ckpt!')
# 进行test
test_acc = evalute(model, test_loader)
print('test acc:', test_acc)


如果想要实时检测训练情况,那么我们可以使用visdom工具,而不是最后进行matplotlib,实时监控可以在模型出现问题时及时停下来,进行调整。(如上面的代码所示)

目前结果

按照以上步骤下来,我们的模型train下来的loss是非常小的,但是测试集,准确率并没有达到理想的准确率,这说明我们的模型发生了过拟合,发生这样的事情非常的正常,因为我们数据集的规模非常的小,而且种类也不多,对于ResNet18这种较为复杂的神经网络是不够的,很容易出现这样的问题,因此,这个时候就有必要使用迁移学习解决过拟合的问题了。

迁移学习(Transfer Learning)

简而言之,迁移学习是一种机器学习方法,就是把为任务 A 开发的模型作为初始点,重新使用在为任务 B 开发模型的过程中。

在我们这里,就是在训练好ImageNet数据集的神经网络的基础上,提取其中训练好的网络参数,加载到要训练宝可梦数据集的网络中,然后再对宝可梦数据集进行训练。

这里我们直接使用的是torchvision中提供好的resnet18模型

1
from torchvision.models import resnet18

此ResNet18是已经训练好的模型!

我们要做的就是将其前17层拆下来,然后最后一层接一层我们自己的全连接层进行分类。

使用children()方法拆下网络的前17层然后传入到我们自己的model中去,然后再接Flatten操作,然后接全连接层。

代码如下所示:(只需要在初始化模型那里改变一点即可,这里就不贴代码汇总了)

1
2
3
4
5
trained_model = resnet18(pretrained=True)
model = nn.Sequential(*list(trained_model.children())[:-1], #[b, 512, 1, 1]
Flatten(), # [b, 512, 1, 1] => [b, 512]
nn.Linear(512, 5)
).to(device)

Flatten层代码:

1
2
3
4
5
6
7
8
class Flatten(nn.Module):

def __init__(self):
super(Flatten, self).__init__()

def forward(self, x):
shape = torch.prod(torch.tensor(x.shape[1:])).item()
return x.view(-1, shape)

结果

最终,train loss在同一水平下,验证集的准确率提高了10%左右,使用迁移学习的效果提升还是非常明显的。


本站由 @anonymity 使用 Stellar 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。