1강 - MNIST Classification 실습
NLP 기초 카테고리에 해당하는 글은
Fastcampus의 '김기현의 딥러닝을 활용한 자연어처리 입문 올인원 패키지 Online'을 바탕으로 제작되었음을 알립니다.
(www.fastcampus.co.kr/data_online_dpnlp)
1. Introduction
1) 필요 모듈
대체적으로 실무에서는 이와 같은 구조로 구현합니다.
1> model.py : Architecture가 정의된 클래스 (Model의 구조를 저장해둔 곳)
2> trainer.py : Model을 학습하기 위한 코드
3> dataloader.py : 데이터를 불러와 전처리를 수행하고, 신경망에 넣기 좋은 형태로 변환
4> train.py : 사용자로부터 hyperparameter을 받아, Model과 Trainer, Loader를 선언하고 학습
5> predict.py : 사용자로부터 Model과 input을 받아, 추론을 수행
cf> train과 inference의 과정의 구조
2) train.py
1> dataloader.py
data를 읽어와서 model에 넣어주기 적합한 형태로 변환
2> trainer.py
model.py(모델 구조)를 가지고
dataloader로부터 mini-batch를 받아서
학습 진행 (forward & backpropagation)
3) predict.py
학습된 model.py(모델 구조)를 가지고
test sample이 들어오면
inference
2. 실습
0) 파일 구조
utils.py에서 MNIST data를 가져오도록 만들었다.
1) model.py
cf> super의 의미
(메소드 오버라이딩으로 부모 메서드를 변형할 수도 있지만)
부모 메서드는 그대로 가져온다. (그래서 부모 메서드에서 기능을 추가하는 오버라이딩을 할 때 사용한다.)
cf> nn.Module의 경우
대체적으로 [1] __init__(self)와 [2] forward(self, x)만 오버라이딩 해도 정상적으로 작동합니다.
import torch
import torch.nn as nn
class ImageClassifier(nn.Module):
def __init__(self, input_size, output_size):
self.input_size = input_size
self.output_size = output_size
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_size, 500), nn.LeakyReLU(), nn.BatchNorm1d(500),
nn.Linear(500, 400), nn.LeakyReLU(), nn.BatchNorm1d(400),
nn.Linear(400, 300), nn.LeakyReLU(), nn.BatchNorm1d(300),
nn.Linear(300, 200), nn.LeakyReLU(), nn.BatchNorm1d(200),
nn.Linear(200, 100), nn.LeakyReLU(), nn.BatchNorm1d(100),
nn.Linear(100, 50), nn.LeakyReLU(), nn.BatchNorm1d(50),
nn.Linear(50, output_size),
nn.LogSoftmax(dim=-1),
)
def forward(self, x):
# |x| = (batch_size, input_size)
y = self.layers(x)
# |y| = (batch_size, output_size)
return y
1> __init__(self)
필요한 layer(sub-layer)를 정의해둔다.
[1] logsoftmax 연산을 hidden size인 10에 가하기 위해서 dim을 -1로 설정
2> forward(self, x)
실제 forward propagation에 필요한 연산을 진행
2. trainer.py
from copy import deepcopy
import numpy as np
import torch
import torch.nn.functional as F
import torch.optim as optim
class Trainer():
def __init__(self, model, optimizer, crit):
self.model = model
self.optimizer = optimizer
self.crit = crit
super().__init__()
def _train(self, x, y, config):
self.model.train() # 1> train : 학습 전에 항상 학습 모드임을 알려주기
# data shuffle
indices = torch.randperm(x.size(0), device=x.device)
x = torch.index_select(x, dim=0, index=indices).split(config.batch_size, dim=0)
y = torch.index_select(y, dim=0, index=indices).split(config.batch_size, dim=0)
total_loss = 0
for i, (x_i, y_i) in enumerate(zip(x, y)):
y_hat_i = self.model(x_i) # 2> forward propagation
loss_i = self.crit(y_hat_i, y_i.squeeze()) # 3> compute loss
# Initialize the gradients of the model.
# 4> backward propagation
self.optimizer.zero_grad() # optimizer에 이전에 저장된 gradient가 있을까봐 0으로 초기화
loss_i.backward()
self.optimizer.step() # 5> gradient descent
if config.verbose >= 2:
print("Train Iteration(%d/%d): loss=%.4e" % (i + 1, len(x), float(loss_i)))
# Don't forget to detach to prevent memory leak.
total_loss += float(loss_i) # 6> total loss 구하기 (detach)
return total_loss / len(x)
def _validate(self, x, y, config):
# Turn evaluation mode on.
self.model.eval() # 1> eval : [1] 학습 모드임을 알리기
# Turn on the no_grad mode to make more efficintly.
with torch.no_grad(): # 1> eval [2] gradient 계산이 필요 없다.
# Shuffle before begin.
indices = torch.randperm(x.size(0), device=x.device)
x = torch.index_select(x, dim=0, index=indices).split(config.batch_size, dim=0)
y = torch.index_select(y, dim=0, index=indices).split(config.batch_size, dim=0)
total_loss = 0
for i, (x_i, y_i) in enumerate(zip(x, y)):
y_hat_i = self.model(x_i)
loss_i = self.crit(y_hat_i, y_i.squeeze())
if config.verbose >= 2:
print("Valid Iteration(%d/%d): loss=%.4e" % (i + 1, len(x), float(loss_i)))
total_loss += float(loss_i)
return total_loss / len(x)
def train(self, train_data, valid_data, config):
lowest_loss = np.inf
best_model = None
for epoch_index in range(config.n_epochs):
train_loss = self._train(train_data[0], train_data[1], config)
valid_loss = self._validate(valid_data[0], valid_data[1], config)
# You must use deep copy to take a snapshot of current best weights.
if valid_loss <= lowest_loss:
lowest_loss = valid_loss
best_model = deepcopy(self.model.state_dict())
print("Epoch(%d/%d): train_loss=%.4e valid_loss=%.4e lowest_loss=%.4e" % (
epoch_index + 1,
config.n_epochs,
train_loss,
valid_loss,
lowest_loss,
))
# Restore to best model.
self.model.load_state_dict(best_model)
0) __init__(self, model, optimizer, crit)
입력 받은 각 함수의 파라미터(model, optimizer, crit)을 클래스 변수로 저장
(crit은 loss를 의미합니다.)
1) train(self, train_data, valid_data, config)
아래 사진의 구조가 trainer.py입니다. (즉, train 함수를 의미합니다.)
또한, train은
[1] 학습을 진행하는 _train 과
[2] 1epoch의 학습을 완료한 뒤 검증을 진행하는 _validate 로 구성되어 있습니다.
_train 함수와 _validate 함수를 통해 각각의 평균 loss 계산합니다.
2) _train(self, x, y, config)
1> train인 경우 해야할 것
[1] 학습 모드임을 알리기 [self.model.train()]
(항상 학습 전에 model에게 학습 모드라고 알려주어야 합니다.)
—- data 1개마다의 loop 시작 —
2> forward propagation
3> calculate loss
4> backward propagation
[1] self.optimizer.zero_grad() : (optimizer에 이전에 저장한 gradient가 있을까봐) gradient를 0으로 초기화
[2] loss_i.backward() : backward propagation
5> gradient descent [self.optimizer.step()]
6> total_loss 구하기
pytorch는 연산할 때마다 그 즉시 computational graph가 생성됩니다.
⇒ loss_i에는 이제까지 연산한 graph가 붙어있다.
⇒ 만약 그냥 loss_i를 total_loss로 축적하면 모든 loss_i의 graph가 total_loss에 그대로 붙는다.
⇒ 엄청난 memory leak
⇒ 그래서 detach하기 위해 float() 을 이용해 tensor를 float으로 바꿉니다.
3) _validate(self, x, y, config)
1> eval인 경우 해야할 것
[1] 검증 모드 임을 알리기 self.model.eval()
[2] gradient 계산을 하지 않도록 한다. with torch.no\_grad():
(그래야 더 빠르고 메모리를 덜 사용한다.)
2> forward propagation
3> compute loss
3. train.py
1> trainer.py를 가지고 전체 학습을 진행하는 모듈
2> shell을 통해 'python train.py {argument들}` 형태로 실행합니다.
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
from model import ImageClassifier
from trainer import Trainer
from utils import load_mnist
def define_argparser():
p = argparse.ArgumentParser() # 1> 객체 만들고
# 2> argument를 추가합니다.
p.add_argument('--model_fn', required=True)
p.add_argument('--gpu_id', type=int, default=0 if torch.cuda.is_available() else -1)
p.add_argument('--train_ratio', type=float, default=.8)
p.add_argument('--batch_size', type=int, default=64)
p.add_argument('--n_epochs', type=int, default=20)
p.add_argument('--verbose', type=int, default=2)
# 3> config를 만들고
config = p.parse_args()
return config # 4> 이 config는 나중에 main의 argument로 사용
def main(config):
# Set device based on user defined configuration.
# 1> device 설정
device = torch.device('cpu') if config.gpu_id < 0 else torch.device('cuda:%d' % config.gpu_id)
# 2> data load
x, y = load_mnist(is_train=True) # [1] data load
# Reshape tensor to chunk of 1-d vectors.
x = x.view(x.size(0), -1) # [2] data reshape
# [3] data 개수 설정
train_cnt = int(x.size(0) * config.train_ratio)
valid_cnt = x.size(0) - train_cnt
# Shuffle dataset to split into train/valid set.
# [4] data reshape
indices = torch.randperm(x.size(0))
x = torch.index_select(
x,
dim=0,
index=indices
).to(device).split([train_cnt, valid_cnt], dim=0)
y = torch.index_select(
y,
dim=0,
index=indices
).to(device).split([train_cnt, valid_cnt], dim=0)
print("Train:", x[0].shape, y[0].shape)
print("Valid:", x[1].shape, y[1].shape)
model = ImageClassifier(28**2, 10).to(device)
optimizer = optim.Adam(model.parameters())
crit = nn.NLLLoss()
trainer = Trainer(model, optimizer, crit)
trainer.train((x[0], y[0]), (x[1], y[1]), config)
# Save best model weights.
torch.save({
'model': trainer.model.state_dict(),
'config': config,
}, config.model_fn)
if __name__ == '__main__':
config = define_argparser()
main(config)
1) parameter 설정
1> ArgumentParser 객체(parser)를 만들고
2> 이 객체(parser)에 add_argument()
를 이용해서 argument를 추가합니다.
- ArgumentParser.add_argument(name or flags...[, action][, nargs][, const][, default][, type][, choices][, required][, help][, metavar][, dest])
- [1] name or flags : parameter의 이름
- [2] required : 꼭 있어야 하는 argument인가? (필수 parameter라면 True로 설정)
3> 이 객체에 parse_args()
를 사용하여 config를 만듭니다.
4> config를 main 함수의 argument로 사용합니다.
2) main 함수
일단 위의 과정에서 만든 config를 argument로 받습니다.
1> device 설정
- cpu 사용 시 : torch.device('cpu')
- gpu 사용 시 : torch.device('cuda:%d' config.gpu_id)
2> data load
[1] data upload [2] reshape [3] split [4] shuffle
cf> .to(device)
내가 원하는 device로 보내준다.
3> trainer.py 사용
[1] 3가지 요소(model, optimizer, loss)를 불러와야 합니다.
- model (model.py의 ImageClassifier)
- optimizer (optim 패키지에 정의된 optimizer 가져오기)
- loss(criterion) (nn 패키지에 정의된 loss 가져오기)
[2] ⇒ 그리고 이 3가지 요소를 train.Trainer 클래스의 인자로 사용하여 객체를 만든다.
[3] 객체의 train 메소드를 이용해서 학습을 진행한다.
4> model save
cf> state_dict : 구성 요소
- model.state_dict() : layer들
- optimizer.state_dict() : state, param_groups(lr, momentum 등등)
1> torch.save(model.state_dict(), config.model_fn)
2> torch.save( {'model': trainer.model.state_dict(), 'config': config}, config.model_fn)
이처럼 dictionary를 사용해서 config까지 인자로 넣어줄 수 있다.
4. utils.py
1) load_mnist
def load_mnist(is_train=True, flatten=True):
from torchvision import datasets, transforms
dataset = datasets.MNIST(
'../data', train=is_train, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
]),
)
x = dataset.data.float() / 255.
y = dataset.targets
if flatten:
x = x.view(x.size(0), -1)
return x, y
dataloader 형태를 불러옵니다.