AutoEncoder

AutoEncoder는 학습 데이터에 레이블 데이터를 별도로 구축할 필요가 없는, 주어진 데이터만으로 학습이 가능한 비지도 학습 신경망입니다. 엄밀히 말해 입력 데이터가 곧 레이블 데이터가 됩니다. AutoEncoder 신경망은 Encoder와 Decoder라는 2개의 신경망으로 구성됩니다. Encoder는 입력 데이터에서 중요한 정보만을 남기고 신경망의 입장에서 학습시에 중요하지 않다고 판단되는 정보는 제거함으로써 처음 입력 데이터의 크기보다 더 작은 크기의 데이터(z)를 생성해 주는 신경망입니다. Decoder는 Encoder가 생성한 z를 가지고 다시 처음의 입력 이미지로 복원하는 신경망입니다.

Encoder가 생성해 주는 보다 더 작은 크기의 데이터를 잠재 벡터(Latent Vector)이라고 하며, z라고 흔히 표기합니다. 이 z는 GAN의 Generator의 입력 데이터인 z와 그 의미가 동일선상에 놓여 있습니다. 잠재 벡터라고 하는 이유는 어떤 중요한 ‘의미’가 잠재되어 있는 데이터(벡터)이기 때문입니다. 결국 이 z에는 처음 입력 데이터에서 중요한 의미만을 남겨 놓은 것, 압축된 것이라고 할 수 있습니다. 또한 이 z에는 별로 중요하지 않거나 잡음같은 것들이 제거된 데이터라고 할 수 있습니다. 압축 차원에서 보자면 손실 압축입니다. 이러한 AutoEncoder 신경망의 용도는 차원감소, 중요한 의미 추출, 잠재벡터를 통한 복잡한 데이터의 공간상 시각화, 이미지 검색, Segmentation, Super Resolution 등 매우 다양합니다.

이러한 AutoEncoder를 이미지 압축과 복원의 관점에서 CNN 레이어를 사용해 구현함으로써 더욱 구체적인 내용을 정리해 보겠습니다. 딥러닝 라이브러리는 PyTorch를 사용하였습니다. TensorFlow 역시 신경망 구성 레이어는 동일하니 어렵지 않게 변환이 가능할 것입니다.

먼저 필요한 패키지들을 Import 해 둡니다.

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision.datasets as dset
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import random
from torch.utils.data import DataLoader

압축과 복원 대상이 되는 이미지는 Fashion MNIST를 사용하겠습니다.

batch_size = 1024
root = './MNIST_Fashion'
transform = transforms.Compose([transforms.ToTensor()])
train_data = dset.FashionMNIST(root=root, train=True, transform=transform, download=True)
test_data = dset.FashionMNIST(root=root, train=False, transform=transform, download=True)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)

AutoEncoder의 신경망에 대한 클래스를 정의합니다.

z_size = 314

class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=2, stride=2, bias=False),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=2, stride=2, bias=False),
            nn.LeakyReLU(),
            Reshape((-1,7*7*64)),
            nn.Linear(7*7*64, z_size),
            nn.LeakyReLU(),
        )

        self.decoder = nn.Sequential(
            nn.Linear(z_size, 7*7*64),
            nn.LeakyReLU(),
            Reshape((-1,64,7,7)),
            nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=2, stride=2, bias=False),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(in_channels=32, out_channels=1, kernel_size=2, stride=2, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        out = self.encoder(x)
        out = self.decoder(out)
        return out

위의 코드가 기술하는 신경망의 구성 레이어들 사이로 전달되는 Tensor의 크기를 표기한 그림은 다음과 같습니다.

AutoEncoder 클래스는 encoder와 decoder로 레이어들을 분리해 구성하고 있는 것을 볼 수 있습니다. 이렇게 해두면 추후에 encoder 만을 이용해 잠재 벡터만을 쉽게 얻어낼 수 있습니다. 보시면 encoder와 decoder는 레이어의 구성이 성호 대칭성이 있는 것을 볼 수 있습니다. 또한 앞서 언급 했듯이 AutoEncoder는 입력 데이터가 레이블 데이터로 사용되므로 출력 데이터가 입력 데이터의 텐서 크기와 동일합니다. 중요한 점은 z의 크기인 z_size를 314로 해두었는데, 이는 원본 이미지의 크기 28×28인 784가 약 40%의 크기로 압축된다는 것입니다. 다시 상기하면 이 40% 크기로 압축된 z에는 원본 이미지의 중요한 특징값들이 잠재되어 있고 불필요한 것이라고 판단되는 것들은 제거되어 있다는 것입니다. 물론 이러한 판단은 신경망이 데이터를 통해 스스로 판단합니다.

추가적으로 위의 신경망 클래스의 구성 레이어 중 Reshape라는 Tensor의 크기를 변경해 주는 레이어의 코드는 다음과 같습니다.

class Reshape(nn.Module):
    def __init__(self, shape):
        super(Reshape, self).__init__()
        self.shape = shape

    def forward(self, x):
        return x.view(*self.shape)

다음은 학습 코드입니다.

num_epochs = 15

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = AutoEncoder().to(device)
loss_func = nn.MSELoss().to(device)
optimizer = optim.Adam(model.parameters())
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer,threshold=0.1, patience=1, mode='min')    

for i in range(num_epochs):
    for _, [image, _] in enumerate(train_loader):
        x = image.to(device)
        y_= image.to(device)
        
        optimizer.zero_grad()
        output = model(x)
        loss = loss_func(output, y_)
        loss.backward()
        optimizer.step()
 
    scheduler.step(loss)      
    print('Epoch: {}, Loss: {}, LR: {}'.format(i, loss.item(), scheduler.optimizer.state_dict()['param_groups'][0]['lr']))

몇차례 실행을 해보니 15 Epoch 정도에서 손실값이 더 이상 감소하지 않는 것을 확인했고 Overfitting을 방지하기 위해 최종적으로 반복 학습수를 15로 지정했습니다. 물론 어떤 신경망은 학습시에 손실값이 한동안 일정값에서 정체 하다가 다시 감소하는 경우가 있으므로 지속적이고 세밀한 관찰이 필요합니다.

학습이 완료되었다면, 그 결과를 시각화합니다.

model.eval()

rows = 4
for c in range(rows):
    plt.subplot(rows, 2, c*2+1)
    rand_idx = random.randint(0, test_data.data.shape[0])
    plt.imshow(test_data.data[rand_idx].view(28,28), cmap='gray')
    plt.axis('off')

    plt.subplot(rows, 2, c*2+2)
    inp = transform(test_data.data[rand_idx].numpy().reshape(28,28)).reshape(1,1,28,28).to(device)
    img = model(inp)
    plt.imshow(img.view(28,28).detach().cpu().numpy(), cmap='gray')
    plt.axis('off')

    print(test_data.targets[rand_idx])

plt.show()

결과는 아래와 같습니다.

총 4줄의 이미지들이 표시되는데, 왼쪽은 입력 이미지이고 오른쪽은 AutoEncoder의 Decoder가 생성해낸 이미지입니다.

케라스(Keras)의 get_file 함수

신경망 학습을 위한 데이터 준비를 위해, 인터넷 상의 파일을 다운로드 받아 압축을 풀 경우가 있습니다. 이때 내가 원하는 로컬 경로에 다운로드를 받고, 원하는 서브 디렉토리에 압축을 풀기 위해 다음의 코드가 사용될 수 있습니다.

import tensorflow as tf

path = 'D:/GeoAI/20200203'
tf.keras.utils.get_file(path + '/image.zip', 'http://where.net/data.zip', extract=True, cache_subdir='data', cache_dir=path)

결과적으로 인터넷 상에 존재하는 http://where.net/data.zip을 D:/GeoAI/20200203 디렉토리에 image.zip 파일명으로 다운로드 받고, 압축은 D:/GeoAI/20200203/data에 풀게 됩니다.

추가적으로 아래의 코드는 압축이 풀린 파일명 리스트를 얻는 코드입니다.

import pathlib
file_path = pathlib.Path(path + '/data/images')

file_paths = list(file_path.glob('*/*'))
print(file_paths[:10])

이렇게 얻은 파일들의 경로를 통해, 만약 해당 파일이 이미지 형식이라면 실제 화면상에 표시해 확인할 수 있는데, 해당 코드는 아래와 같습니다.

import matplotlib.pyplot as plt
import os

plt.figure(figsize=(12,12))
for i in range(9):
    plt.subplot(3,3,i+1)
    plt.imshow(plt.imread(file_paths[i]))
    plt.title(os.path.basename(file_paths[i]))
    plt.axis('off')
plt.show()

결과는 다음과 같습니다.

특성값 2개를 입력하여 3가지로 분류하는 신경망

이 글에서 소개하고 있는 소스코드는 사이토 고키가 저술한 “Deep Learning from Scratch”의 첫장에서 소개하고 있는 신경망입니다. 한국어판으로는 한빛미디어의 “밑바닥부터 시작하는 딥러닝2″로도 보실 수 있습니다. 해당 도서의 원저자의 허락 하에 글을 올립니다. 특히 공간상의 좌표와 속성값들에 대한 분류에 대한 내용인지라 GIS 분야에서 흔하게 활용될 수 있는 내용이라고 생각됩니다. 보다 자세한 내용은 해당도서를 추천드립니다.

공간 상의 (x,y)에 대한 하나의 지점에 대해서.. 가재, 물방개, 올챙이인 3가지 종류의 식생이 분포하고 있는 생태계가 있다고 합시다. 즉, 입력되는 특성값은 2개일때 3개 중 한개를 결정해야 합니다. 다시 말해, 이미 채집한 데이터가 있고, 이 데이터를 이용해 예측 모델을 훈련시킬 것이며, 훈련된 모델을 이용하면 임이의 (x,y) 위치에 대해 존재하는 식생이 무엇인지를 예측하고자 합니다. 그럼 먼저 예측 모델을 훈련할때 사용할 채집 데이터가 필요한데, 아래의 load_spiral_data 함수가 이러한 채집 데이터를 생성해 줍니다. 아래의 코드는 load_spiral_data 함수를 이용해 채집 데이터를 생성하고 시각화합니다.

import numpy as np
import matplotlib.pyplot as plt

def load_spiral_data(seed=7777):
    np.random.seed(seed)
    DIM = 2  # 입력 데이터 특성 수
    CLS_NUM = 3  # 분류할 클래스 수
    N = 100  # 클래스 하나 당 샘플 데이터 수

    x = np.zeros((N*CLS_NUM, DIM))
    t = np.zeros((N*CLS_NUM, CLS_NUM), dtype=np.int)

    for j in range(CLS_NUM):
        for i in range(N): # N*j, N*(j+1)):
            rate = i / N
            radius = 1.0*rate
            theta = j*4.0 + 4.0*rate + np.random.randn()*0.2

            ix = N*j + i
            x[ix] = np.array([radius*np.sin(theta),
                              radius*np.cos(theta)]).flatten()
            t[ix, j] = 1

    return x, t

x, t = load_spiral_data()
N = 100
CLS_NUM = 3
markers = ['o', 'x', '^']
for i in range(CLS_NUM):
    plt.scatter(x[i*N:(i+1)*N, 0], x[i*N:(i+1)*N, 1], s=40, marker=markers[i])
plt.show()

결과는 아래와 같습니다.

훈련용 데이터가 준비되었으므로, 이제 예측 모델에 대한 신경망에 대한 코드를 살펴보겠습니다. 아래는 은닉층이 1개인 신경망에 대한 클래스입니다. 은닉층이 1개이구요. 이 클래스의 생성자에 대한 인자는 입력되는 특성값의 개수(input_size)와 은닉층의 뉴런 개수(hidden_size) 그리고 분류 결과 개수(output_size)입니다.

class Net:
    def __init__(self, input_size, hidden_size, output_size):
        I, H, O = input_size, hidden_size, output_size

        W1 = 0.01 * np.random.randn(I, H)
        b1 = np.zeros(H)
        W2 = 0.01 * np.random.randn(H, O)
        b2 = np.zeros(O)

        self.layers = [
            Affine(W1, b1),
            Sigmoid(),
            Affine(W2, b2)
        ]
        self.loss_layer = SoftmaxWithLoss()

        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def forward(self, x, t):
        score = self.predict(x)
        loss = self.loss_layer.forward(score, t)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

신경망의 뉴런값과 가중치 그리고 편향에 대한 연산을 위해 Affine 클래스가 사용됬으며, 신경망의 기반이 되는 선형 회귀에 비선형 회귀 특성을 부여하는 활성화함수로 Sigmoid 클래스가 사용되었습니다. 아울러 3개 이상의 출력결과를 확률로 해석하고 이를 기반으로 손실값을 계산할 수 있는 Softmax와 Cross Entropy Error를 하나의 합친 SoftmaxWithLoss 클래스가 사용되었습니다.

먼저 Affine, Sigmoid 클래스의 코드는 다음과 같습니다.

class Affine:
    def __init__(self, W, b):
        self.params = [W, b]
        self.grads = [np.zeros_like(W), np.zeros_like(b)]
        self.x = None

    def forward(self, x):
        W, b = self.params
        out = np.dot(x, W) + b
        self.x = x
        return out

    def backward(self, dout):
        W, b = self.params
        dx = np.dot(dout, W.T)
        dW = np.dot(self.x.T, dout)
        db = np.sum(dout, axis=0)

        self.grads[0][...] = dW
        self.grads[1][...] = db
        return dx

class Sigmoid:
    def __init__(self):
        self.params, self.grads = [], []
        self.out = None

    def forward(self, x):
        out = 1 / (1 + np.exp(-x))
        self.out = out
        return out

    def backward(self, dout):
        dx = dout * (1.0 - self.out) * self.out
        return dx

그리고 SoftmaxWithLoss 클래스는 다음과 같습니다.

class SoftmaxWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.y = None  # Softmax의 출력
        self.t = None  # 정답 레이블

    def forward(self, x, t):
        self.t = t
        self.y = softmax(x)

        # 정답 레이블이 원핫 벡터일 경우 정답의 인덱스로 변환
        if self.t.size == self.y.size:
            self.t = self.t.argmax(axis=1)

        loss = cross_entropy_error(self.y, self.t)
        return loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        dx = self.y.copy()
        dx[np.arange(batch_size), self.t] -= 1
        dx *= dout
        dx = dx / batch_size

        return dx

우의 클래스는 Softmax 연산과 Cross Entropy Error 연산을 각각 softmax와 cross_entropy_error 함수를 통해 처리하고 있습니다. 이 두 함수는 아래와 같습니다.

def softmax(x):
    if x.ndim == 2:
        x = x - x.max(axis=1, keepdims=True)
        x = np.exp(x)
        x /= x.sum(axis=1, keepdims=True)
    elif x.ndim == 1:
        x = x - np.max(x)
        x = np.exp(x) / np.sum(np.exp(x))

    return x

def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)
        
    # 정답 데이터가 원핫 벡터일 경우 정답 레이블 인덱스로 변환
    if t.size == y.size:
        t = t.argmax(axis=1)
             
    batch_size = y.shape[0]

    return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size

학습 데이터와 모델까지 준비되었으므로, 이제 실제 학습에 대한 코드를 살펴보겠습니다. 먼저 학습을 위한 하이퍼파라메터 및 모델생성 그리고 기타 변수값들입니다.

# 하이퍼파라미터
max_epoch = 300
batch_size = 30
hidden_size = 10
learning_rate = 1.0

# 모델
model = Net(input_size=2, hidden_size=hidden_size, output_size=3)

# 가중치 최적화를 위한 옵티마이저
optimizer = SGD(lr=learning_rate)

# 학습에 사용하는 변수
data_size = len(x)
max_iters = data_size // batch_size
total_loss = 0
loss_count = 0
loss_list = []

다음은 실제 학습 수행 코드입니다.

for epoch in range(max_epoch):
    # 데이터 뒤섞기
    idx = np.random.permutation(data_size)
    x = x[idx]
    t = t[idx]

    for iters in range(max_iters):
        batch_x = x[iters*batch_size:(iters+1)*batch_size]
        batch_t = t[iters*batch_size:(iters+1)*batch_size]

        # 기울기를 구해 매개변수 갱신
        loss = model.forward(batch_x, batch_t)
        model.backward()
        optimizer.update(model.params, model.grads)

        total_loss += loss
        loss_count += 1

        # 정기적으로 학습 경과 출력
        if (iters+1) % 10 == 0:
            avg_loss = total_loss / loss_count
            print('| 에폭 %d |  반복 %d / %d | 손실 %.2f'
                  % (epoch + 1, iters + 1, max_iters, avg_loss))
            loss_list.append(avg_loss)
            total_loss, loss_count = 0, 0

출력값으로써 매 학습 단계마다 손실값이 줄어드는 것을 확인할 수 있습니다.

아래는 위의 학습이 완료되면 그 결과를 시각화해주는 코드입니다.

plt.plot(np.arange(len(loss_list)), loss_list, label='train')
plt.xlabel('Loop (x10)')
plt.ylabel('Loss')
plt.show()

결과는 다음과 같습니다.

이제 이 학습된 모델을 이용해 새로운 입력 데이터를 분류하고 이에 대한 결과를 효과적으로 시각해 주는 코드는 아래와 같습니다.

# 경계 영역 플롯
h = 0.001
x_min, x_max = x[:, 0].min() - .1, x[:, 0].max() + .1
y_min, y_max = x[:, 1].min() - .1, x[:, 1].max() + .1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
X = np.c_[xx.ravel(), yy.ravel()]
score = model.predict(X)
predict_cls = np.argmax(score, axis=1)
Z = predict_cls.reshape(xx.shape)
plt.contourf(xx, yy, Z)
plt.axis('off')

# 데이터점 플롯
x, t = load_spiral_data()
N = 100
CLS_NUM = 3
markers = ['o', 'x', '^']
for i in range(CLS_NUM):
    plt.scatter(x[i*N:(i+1)*N, 0], x[i*N:(i+1)*N, 1], s=40, marker=markers[i])
plt.show()

결과는 아래와 같습니다.

TensorFlow에서 원하는 GPU 지정하기

GPU가 여러개 설치된 컴퓨터에서 TensorFlow를 사용할 경우 일반적으로 가장 첫번째(0)의 GPU를 사용하게 되는데.. 이때 아래의 코드를 통해 두번째(1)의 GPU를 사용하라고 지정할 수 있음

import tensorflow as tf
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "1"