이전 Seq2Seq 글을 보고오시면 좋습니다 !
1. Encoder
Seq2Seq의 Encoder에서는 Input Sequence의 모든 단어들을 순차적으로 입력받은 뒤 마지막에 이 모든 단어 정보를 압축하여 Context Vector(RNN의 마지막 Hidden State)를 만든다. Context Vector가 만들어 지면 Encoder는 이를 Decoder로 넘겨준다.
예제에서는 RNN 대신 GRU(Gated Recurrent Unit)을 사용하였다.
import torch
import torch.nn as nn
class Encoder(nn.Module):
def __init__(self, input_dim, hidden_dim, embed_dim, num_layers):
super(Encoder, self).__init__()
self.input_dim = input_dim # Encoder에서 사용할 입력층 차원(Vocab Size)
self.hidden_dim = hidden_dim # Encoder에서 사용할 은닉층 차원
self.embed_dim = embed_dim # Encoder에서 사용할 Embedding 계층
self.num_layers = num_layers # Encoder에서 사용할 GRU 계층 개수
# Input Vector를 임베딩
# Embedding 계층 초기화
self.embedding = nn.Embedding(num_embeddings=self.input_dim,
embedding_dim=self.embed_dim)
# Embedding 차원, 은닉층 차원, GRU 계층 개수를 이용하여 GRU 계층을 초기화
self.gru = nn.GRU(input_size=self.embed_dim,
hidden_size=self.hidden_dim,
num_layers=self.num_layers)
def forward(self, src):
embedded = self.embedding(src).view(1, 1, -1) # 임베딩 처리
outputs, hidden = self.gru(embedded) # 임베딩 결과를 GRU 모델에 적용
# 이후 Seq2Seq 모델에서는 hidden state를 Context Vector로 사용
return outputs, hidden
2. Decoder
Seq2Seq의 Decoder는 Encoder에게 넘겨받은 Context Vector를 Decoder RNN Cell의 첫번째 Hidden State($h_0$)으로 사용하여 번역된 단어를 한 개씩 순차적으로 출력한다.
예제에서는 RNN 대신 GRU(Gated Recurrent Unit)을 사용하였습니다.
import torch
import torch.nn as nn
import torch.nn.functional as F
class Decoder(nn.Module):
def __init__(self, output_dim, hidden_dim, embed_dim, num_layers):
super(Decoder, self).__init__()
self.output_dim = output_dim # Encoder의 마지막 Hidden State
self.hidden_dim = hidden_dim # Decoder의 은닉층
self.embed_dim = embed_dim # Decoder에서 사용할 Embedding 계층
self.num_layers = num_layers # Dncoder에서 사용할 GRU 계층 개수
self.embedding = nn.Embedding(num_embeddings=output_dim,
embedding_dim=self.embed_dim)
self.gru = nn.GRU(input_size=self.embed_dim,
hidden_size=self.hidden_dim,
num_layers=self.num_layers)
self.out = nn.Linear(self.hidden_dim, self.output_dim)
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden):
input = input.view(1, -1)
embedded = F.relu(self.embedding(input))
output, hidden = self.gru(embedded, hidden)
prediction = self.softmax(self.out(output[0]))
return prediction, hidden
3. Seq2Seq Network
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device, MAX_LENGTH=MAX_LENGTH):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, input_lang, output_lang, teacher_forcing_ratio=0.5):
input_length = input_lang.size(0)
target_length = output_lang.shape[0]
batch_size = output_lang.shape[1]
vocab_size = self.decoder.output_dim
outputs = torch.zeros(target_length, batch_size, vocab_size).to(device)
for i in range(input_length):
encoder_output, encoder_hidden = self.encoder(input_lang[i]) # 문장의 모든 단어를 인코딩
decoder_hidden = encoder_hidden.to(device) # Encoder의 마지막 은닉층을 Decoder의 은닉층으로 사용
decoder_input = torch.tensor([SOS_token], device=device) # 첫 번째 예측 단어 앞에 토큰(SOS) 추가
for t in range(target_length):
decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
outputs[t] = decoder_output
teacher_force = random.random() < teacher_forcing_ratio
topv, topi = decoder_output.topk(1)
# teacher force를 활성화하면 목표 단어를 다음 입력으로 사용
input = (output_lang[t] if teacher_force else topi)
# teacher force를 활성화하지 않으면 자체 예측 값을 다음 임력으로 사용
if (teacher_force == False and input.item() == EOS_token):
break
return outputs
4. Data 전처리
데이터는 영어 - 프랑스어 예제 파일을 사용하였습니다.
Tab-delimited Bilingual Sentence Pairs from the Tatoeba Project (Good for Anki and Similar Flashcard Applications)에서 fra-eng 다운받으시고 파일 명을 eng-fra로 저장하시면 됩니다.
라이브러리 호출
from __future__ import unicode_literals, print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd
import os
import re # 정규표현식 사용
import random
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
데이터 셋을 Dictionary로 바꾸기 위한 클래스
SOS_token = 0
EOS_token = 1
MAX_LENGTH = 20 # Input Sequence의 최대 길기
# Dictionary를 만들기 위한 클래스
class Language:
def __init__(self):
self.word2index = {}
self.word2count = {}
self.index2word = {0:"SOS", 1:"EOS"}
self.n_words = 2
def add_sentence(self, sentence):
for word in sentence.split(" "):
self.add_word(word)
def add_word(self, word):
if word not in self.word2index:
self.word2index[word] = self.n_words
self.word2count[word] = 1
self.index2word[self.n_words] = word
self.n_words += 1
else:
self.word2count[word] += 1
Text 데이터 정규화 및 Tensor로 변환하기 위한 함수 정의
def read_file(loc, lang1, lang2):
df = pd.read_csv(loc, delimiter='\t', header=None, names=[lang1, lang2])
return df
def normalize_string(df, lang):
sentence = df[lang].str.lower()
# a-z, A-Z, ..., ?, !등을 제외하고 모두 공백으로 바꿈
sentence = sentence.str.replace("[^A-Za-z\s]+", "")
sentence = sentence.str.normalize("NFD") # 유니코드 정규화 방식
# Unicode를 ASCII로 변환
sentence = sentence.str.encode('ascii', errors='ignore').str.decode("utf-8")
return sentence
def read_sentence(df, lang1, lang2):
sentence1 = normalize_string(df, lang1)
sentence2 = normalize_string(df, lang2)
return sentence1, sentence2
# lang1, lang2 언어에 해당하는 파일을 읽고
def process_data(path,lang1, lang2):
df = read_file('%s/%s-%s.txt' % (path, lang1, lang2), lang1, lang2)
sentence1, sentence2 = read_sentence(df, lang1, lang2)
# 각 언어에 해당하는 Dictionary 클래스
input_lang = Language()
output_lang = Language()
pairs = []
for i in range(len(df)):
if len(sentence1[i].split(" ")) < MAX_LENGTH and len(sentence2[i].split(" ")) < MAX_LENGTH:
full = [sentence1[i], sentence2[i]]
input_lang.add_sentence(sentence1[i]) # Input으로 영어를 사용
output_lang.add_sentence(sentence2[i]) # Output으로 프랑스어를 사용
pairs.append(full) # pairs에는 Input과 Output이 합쳐진 것을 사용
return input_lang, output_lang, pairs
# 문장을 단어로 분리하고 Index를 반환
def indexes_from_sentence(lang, sentence):
return [lang.word2index[word] for word in sentence.split(" ")]
# 문장을 단어로 분리하고 Index를 Tensor로 반환
def tensor_from_sentence(lang, sentence):
indexes = indexes_from_sentence(lang, sentence)
indexes.append(EOS_token)
return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)
def tensors_from_pair(input_lang, output_lang, pair):
input_tensor = tensor_from_sentence(input_lang, pair[0])
target_tensor = tensor_from_sentence(output_lang, pair[1])
return (input_tensor, target_tensor)
5. Model Train 및 Test 함수 정의
Model Train
def train_model(model, input_lang, output_lang, pairs, num_iterations):
model.train()
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.NLLLoss()
total_loss_iterations = 0
training_pairs = [tensors_from_pair(input_lang, output_lang, random.choice(pairs))
for i in range(num_iterations)]
for iter in range(1, num_iterations+1):
training_pair = training_pairs[iter-1]
input_tensor = training_pair[0]
target_tensor = training_pair[1]
optimizer.zero_grad()
loss = 0
epoch_loss = 0
outputs = model(input_tensor, target_tensor)
num_iter = outputs.size(0)
for ot in range(num_iter):
loss += criterion(outputs[ot], target_tensor[ot])
loss.backward()
optimizer.step()
epoch_loss = loss.item() / num_iter
total_loss_iterations += epoch_loss
if iter%5000 == 0:
average_loss = total_loss_iterations / 5000
total_loss_iterations = 0
print("%d %.4f" %(iter, average_loss))
return model
Model Test
def evaluate(model, input_lang, output_lang, sentences, max_length=MAX_LENGTH):
with torch.no_grad():
input_tensor = tensor_from_sentence(input_lang, sentences[0]) # 입력 문자열을 Tensor로 변환
output_tensor = tensor_from_sentence(output_lang, sentences[1]) # 출력 문자열을 Tensor로 변환
decoded_words = []
output = model(input_tensor, output_tensor)
for ot in range(output.size(0)):
topv, topi = output[ot].topk(1) # 각 출력에서 가장 높은 값을 찾아 인덱스를 반환
# EOS Token을 만나면 평가를 멈춘다
if topi[0].item() == EOS_token:
decoded_words.append("<EOS>")
break
else:
decoded_words.append(output_lang.index2word[topi[0].item()])
return decoded_words
# 훈련 데이터 셋으로부터 임의의 문장을 가져와서 모델 평가
def evaluate_randomly(model, input_lang, output_lang, pairs, n=10):
for i in range(n):
# 임의로 문장을 가져온다
pair = random.choice(pairs)
print('Input : {}'.format(pair[0]))
print('Output : {}'.format(pair[1]))
output_words = evaluate(model, input_lang, output_lang, pair)
output_sentence = ' '.join(output_words)
print('Predicted : {}'.format(output_sentence))
6. Model Train
lang1 = "eng" # Input으로 사용한 영어
lang2 = "fra" # Output으로 사용할 프랑스어
input_lang, output_lang, pairs = process_data(lang1, lang2)
randomize = random.choice(pairs)
print('random sentence {}'.format(randomize))
input_size = input_lang.n_words
output_size = output_lang.n_words
print("Input : {} Output : {}".format(input_size, output_size))
embed_size = 256
hidden_size = 512
num_layers = 1
num_iteration = 75000
# Encoder에 훈련 데이터 셋을 입력하고 모든 출력과 은닉 상태를 저장
encoder = Encoder(input_size, hidden_size, embed_size, num_layers)
# Decoder에 첫 번째 입력으로 <SOS> 토큰이 제공되고, Encoder의 마지막 은닉 상태가 Decoder의 첫번째 은닉 상태로 제공
decoder = Decoder(output_size, hidden_size, embed_size, num_layers)
model = Seq2Seq(encoder, decoder, device).to(device)
print(encoder)
print(decoder)
print(model)
model = train_model(model, input_lang, output_lang, pairs, num_iteration)
반응형
'DL > NLP' 카테고리의 다른 글
[DL][NLP]Transformer 모델 (1) | 2024.10.29 |
---|---|
[DL][NLP] Transformer Encoder 동작 과정(Multi Head Self Attention, Position-wise Feed Forward Neural Network(FFNN)) (1) | 2024.10.29 |
[DL][NLP] Attention Mechanism(어텐션 메커니즘) (1) | 2024.10.28 |
[DL][NLP] Seq2Seq(Seqence to Sequence) 모델 (5) | 2024.10.28 |