본문 바로가기
DL/NLP

[DL][NLP] Seq2Seq 예제 (Pytorch)

by 어떻게든 되겠지~ 2024. 10. 28.

이전 Seq2Seq 글을 보고오시면 좋습니다 !

 

[DL][NLP] Seq2Seq(Seqence to Sequence) 모델

RNN에 대해 먼저 알고 보면 더 이해가 쉽습니다 !2024.08.26 - [DL/RNN] - [DL][RNN] RNN(Recurrent Neural Network, 순환 신경망) 구조 [DL][RNN] RNN(Recurrent Neural Network, 순환 신경망) 구조앞선 글에서 RNN에 대해 간략

self-objectification.tistory.com

 

1. Encoder

Seq2Seq의 Encoder에서는 Input Sequence의 모든 단어들을 순차적으로 입력받은 뒤 마지막에 이 모든 단어 정보를 압축하여 Context Vector(RNN의 마지막 Hidden State)를 만든다. Context Vector가 만들어 지면 Encoder는 이를 Decoder로 넘겨준다.

예제에서는 RNN 대신 GRU(Gated Recurrent Unit)을 사용하였다.

import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, embed_dim, num_layers):
        super(Encoder, self).__init__()
        self.input_dim = input_dim      # Encoder에서 사용할 입력층 차원(Vocab Size)
        self.hidden_dim = hidden_dim    # Encoder에서 사용할 은닉층 차원
        self.embed_dim = embed_dim      # Encoder에서 사용할 Embedding 계층
        self.num_layers = num_layers    # Encoder에서 사용할 GRU 계층 개수
        
        # Input Vector를 임베딩
        # Embedding 계층 초기화
        self.embedding = nn.Embedding(num_embeddings=self.input_dim,
                                      embedding_dim=self.embed_dim)
        
        # Embedding 차원, 은닉층 차원, GRU 계층 개수를 이용하여 GRU 계층을 초기화
        self.gru = nn.GRU(input_size=self.embed_dim,
                          hidden_size=self.hidden_dim,
                          num_layers=self.num_layers)
        
    def forward(self, src):
        embedded = self.embedding(src).view(1, 1, -1) # 임베딩 처리
        outputs, hidden = self.gru(embedded) # 임베딩 결과를 GRU 모델에 적용
        # 이후 Seq2Seq 모델에서는 hidden state를 Context Vector로 사용
        return outputs, hidden

 

2. Decoder

Seq2Seq의 Decoder는 Encoder에게 넘겨받은 Context Vector를 Decoder RNN Cell의 첫번째 Hidden State($h_0$)으로 사용하여 번역된 단어를 한 개씩 순차적으로 출력한다.

예제에서는 RNN 대신 GRU(Gated Recurrent Unit)을 사용하였습니다.

 

import torch
import torch.nn as nn
import torch.nn.functional as F

class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, embed_dim, num_layers):
        super(Decoder, self).__init__()
        self.output_dim = output_dim    # Encoder의 마지막 Hidden State    
        self.hidden_dim = hidden_dim    # Decoder의 은닉층
        self.embed_dim = embed_dim      # Decoder에서 사용할 Embedding 계층 
        self.num_layers = num_layers    # Dncoder에서 사용할 GRU 계층 개수

        self.embedding = nn.Embedding(num_embeddings=output_dim,
                                      embedding_dim=self.embed_dim)
        
        self.gru = nn.GRU(input_size=self.embed_dim,
                          hidden_size=self.hidden_dim,
                          num_layers=self.num_layers)
        
        self.out = nn.Linear(self.hidden_dim, self.output_dim)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        input = input.view(1, -1)
        embedded = F.relu(self.embedding(input))
        output, hidden = self.gru(embedded, hidden)
        prediction = self.softmax(self.out(output[0]))
        return prediction, hidden

3. Seq2Seq Network

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device, MAX_LENGTH=MAX_LENGTH):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, input_lang, output_lang, teacher_forcing_ratio=0.5):
        input_length = input_lang.size(0)
        target_length = output_lang.shape[0]
        batch_size = output_lang.shape[1]
        vocab_size = self.decoder.output_dim
        outputs = torch.zeros(target_length, batch_size, vocab_size).to(device)

        for i in range(input_length):
            encoder_output, encoder_hidden = self.encoder(input_lang[i])    # 문장의 모든 단어를 인코딩
        decoder_hidden = encoder_hidden.to(device)                  # Encoder의 마지막 은닉층을 Decoder의 은닉층으로 사용
        decoder_input = torch.tensor([SOS_token], device=device)    # 첫 번째 예측 단어 앞에 토큰(SOS) 추가

        for t in range(target_length):
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
            outputs[t] = decoder_output
            teacher_force = random.random() < teacher_forcing_ratio
            topv, topi = decoder_output.topk(1)
            
            # teacher force를 활성화하면 목표 단어를 다음 입력으로 사용
            input = (output_lang[t] if teacher_force else topi)
            # teacher force를 활성화하지 않으면 자체 예측 값을 다음 임력으로 사용
            if (teacher_force == False and input.item() == EOS_token):
                break
        return outputs

 

 

4. Data 전처리

데이터는 영어 - 프랑스어 예제 파일을 사용하였습니다.

Tab-delimited Bilingual Sentence Pairs from the Tatoeba Project (Good for Anki and Similar Flashcard Applications)에서 fra-eng 다운받으시고 파일 명을 eng-fra로 저장하시면 됩니다.

라이브러리 호출

from __future__ import unicode_literals, print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd
import os
import re       # 정규표현식 사용
import random

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

 

데이터 셋을 Dictionary로 바꾸기 위한 클래스

SOS_token = 0
EOS_token = 1
MAX_LENGTH = 20 # Input Sequence의 최대 길기

# Dictionary를 만들기 위한 클래스
class Language:
    def __init__(self):
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0:"SOS", 1:"EOS"}
        self.n_words = 2

    def add_sentence(self, sentence):
        for word in sentence.split(" "):
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

Text 데이터 정규화 및 Tensor로 변환하기 위한 함수 정의

def read_file(loc, lang1, lang2):
    df = pd.read_csv(loc, delimiter='\t', header=None, names=[lang1, lang2])
    return df

def normalize_string(df, lang):
    sentence = df[lang].str.lower()
    # a-z, A-Z, ..., ?, !등을 제외하고 모두 공백으로 바꿈
    sentence = sentence.str.replace("[^A-Za-z\s]+", "")
    sentence = sentence.str.normalize("NFD") # 유니코드 정규화 방식
    # Unicode를 ASCII로 변환
    sentence = sentence.str.encode('ascii', errors='ignore').str.decode("utf-8")
    return sentence

def read_sentence(df, lang1, lang2):
    sentence1 = normalize_string(df, lang1)
    sentence2 = normalize_string(df, lang2)
    return sentence1, sentence2

# lang1, lang2 언어에 해당하는 파일을 읽고
def process_data(path,lang1, lang2):
    df = read_file('%s/%s-%s.txt' % (path, lang1, lang2), lang1, lang2) 
    sentence1, sentence2 = read_sentence(df, lang1, lang2)

    # 각 언어에 해당하는 Dictionary 클래스
    input_lang = Language()
    output_lang = Language()
    pairs = []

    for i in range(len(df)):
        if len(sentence1[i].split(" ")) < MAX_LENGTH and len(sentence2[i].split(" ")) < MAX_LENGTH:
            full = [sentence1[i], sentence2[i]]     
            input_lang.add_sentence(sentence1[i])   # Input으로 영어를 사용
            output_lang.add_sentence(sentence2[i])  # Output으로 프랑스어를 사용
            pairs.append(full)                      # pairs에는 Input과 Output이 합쳐진 것을 사용

    return input_lang, output_lang, pairs

# 문장을 단어로 분리하고 Index를 반환
def indexes_from_sentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(" ")]
    
# 문장을 단어로 분리하고 Index를 Tensor로 반환
def tensor_from_sentence(lang, sentence):
    indexes = indexes_from_sentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensors_from_pair(input_lang, output_lang, pair):
    input_tensor = tensor_from_sentence(input_lang, pair[0])
    target_tensor = tensor_from_sentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

 

5. Model Train 및 Test 함수 정의

Model Train

def train_model(model, input_lang, output_lang, pairs, num_iterations):
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    criterion = nn.NLLLoss()
    total_loss_iterations = 0   
    training_pairs = [tensors_from_pair(input_lang, output_lang, random.choice(pairs)) 
                      for i in range(num_iterations)]

    for iter in range(1, num_iterations+1):
        training_pair = training_pairs[iter-1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]

        optimizer.zero_grad()
        loss = 0
        epoch_loss = 0
        outputs = model(input_tensor, target_tensor)
        num_iter = outputs.size(0)

        for ot in range(num_iter):
            loss += criterion(outputs[ot], target_tensor[ot])


        loss.backward()
        optimizer.step()
        epoch_loss = loss.item() / num_iter

        total_loss_iterations += epoch_loss

        if iter%5000 == 0:
            average_loss = total_loss_iterations / 5000
            total_loss_iterations = 0
            print("%d %.4f" %(iter, average_loss))

    return model

Model Test

def evaluate(model, input_lang, output_lang, sentences, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensor_from_sentence(input_lang, sentences[0])   # 입력 문자열을 Tensor로 변환
        output_tensor = tensor_from_sentence(output_lang, sentences[1]) # 출력 문자열을 Tensor로 변환
        decoded_words = []
        output = model(input_tensor, output_tensor)

        for ot in range(output.size(0)):
            topv, topi = output[ot].topk(1) # 각 출력에서 가장 높은 값을 찾아 인덱스를 반환

            # EOS Token을 만나면 평가를 멈춘다
            if topi[0].item() == EOS_token:
                decoded_words.append("<EOS>")
                break
            else:
                decoded_words.append(output_lang.index2word[topi[0].item()])

    return decoded_words

# 훈련 데이터 셋으로부터 임의의 문장을 가져와서 모델 평가
def evaluate_randomly(model, input_lang, output_lang, pairs, n=10):
    for i in range(n):
        # 임의로 문장을 가져온다        
        pair = random.choice(pairs)
        print('Input : {}'.format(pair[0]))
        print('Output : {}'.format(pair[1]))
        output_words = evaluate(model, input_lang, output_lang, pair)
        output_sentence = ' '.join(output_words)
        print('Predicted : {}'.format(output_sentence))

 

6. Model Train

lang1 = "eng"   # Input으로 사용한 영어
lang2 = "fra"   # Output으로 사용할 프랑스어

input_lang, output_lang, pairs = process_data(lang1, lang2)

randomize = random.choice(pairs)
print('random sentence {}'.format(randomize))

input_size = input_lang.n_words
output_size = output_lang.n_words
print("Input : {} Output : {}".format(input_size, output_size))

embed_size = 256
hidden_size = 512
num_layers = 1
num_iteration = 75000


# Encoder에 훈련 데이터 셋을 입력하고 모든 출력과 은닉 상태를 저장
encoder = Encoder(input_size, hidden_size, embed_size, num_layers)
# Decoder에 첫 번째 입력으로 <SOS> 토큰이 제공되고, Encoder의 마지막 은닉 상태가 Decoder의 첫번째 은닉 상태로 제공
decoder = Decoder(output_size, hidden_size, embed_size, num_layers)
model = Seq2Seq(encoder, decoder, device).to(device)

print(encoder)
print(decoder)
print(model)

model = train_model(model, input_lang, output_lang, pairs, num_iteration)
반응형