Sequence-to-Sequence(Seq2Seq)
일반적으로 Seq2Seq 모델은 입력 열에 대한 출력 열을 예측하는 신경망 아키텍처의 일종입니다. 주로 기계 번역, Q/A, 텍스트 요약 등에서 많이 사용됩니다. 이번 프로젝트에서는 논리연산을 입력으로, 계산 결과를 출력으로 해석하는 Seq2Seq 모델을 활용해 문제를 해결해보겠습니다.
데이터 생성 및 처리
- 논리연산에 대한 데이터셋을 생성하며, 두 자리 수까지의 숫자 쌍을 사용해 연산 결과를 데이터셋에 포함하도록 구성하였습니다.
# 데이터셋 생성 함수
def generate_dataset(num_samples=10000):
data = {
"x1": [],
"operator": [],
"x2": [],
"input" : [],
"output": []
}
operators = ["AND", "OR", "XOR"]
for _ in range(num_samples):
value1 = random.randint(0, 99)
operator = random.choice(operators)
value2 = random.randint(0, 99)
if operator == "AND":
result = value1 & value2
elif operator == "OR":
result = value1 | value2
elif operator == "XOR":
result = value1 ^ value2
data['x1'].append(str(value1))
data['operator'].append(operator)
data['x2'].append(str(value2))
data['input'].append(f"{value1}{operator}{value2}") # 입력 문자열
data['output'].append(result) # 올바른 연산 결과를 타겟으로 추가
return pd.DataFrame(data)
- 패딩 토큰('#')을 활용해 입출력 데이터의 길이 고정
- seq2seq논문에 근거하에 <sos>, <eos>토큰을 활용
- 문자열을 정수로 변환(One Hot Encoding)
- 입력데이터 : "<sos>" + "최대 2자리의 자릿수" + "AND || OR || XOR" + "최대 2자리의 자릿수" + "<eos>" : 최대 길이 7
- 출력데이터 : "<sos>" + "최대 3자리의 자릿수(01111111)" + "<eos>" : 최대 길이 5
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
class CustomDataset(Dataset):
def __init__(self, df):
self.df = df
self.input_mapping_data = {
'0': 0,
'1': 1,
'2': 2,
'3': 3,
'4': 4,
'5': 5,
'6': 6,
'7': 7,
'8': 8,
'9': 9,
'AND': 10,
'OR' : 11,
'XOR' : 12,
'<sos>' : 13, # Start of Sequence
'<eos>': 14, # End of Sequence
'#': 15 # Padding token
}
self.output_mapping_data = {
'0': 0,
'1': 1,
'2': 2,
'3': 3,
'4': 4,
'5': 5,
'6': 6,
'7': 7,
'8': 8,
'9': 9,
'<sos>': 10, # Start of Sequence
'<eos>': 11, # End of Sequence
'#': 12 # Padding token
}
self.max_input_length = 7
self.max_output_length = 5
def __len__(self):
return len(self.df)
def __getitem__(self,idx):
if torch.is_tensor(idx):
idx = idx.tolist()
input_str = self.df.loc[idx, 'input']
output_str = str(self.df.loc[idx, 'output']) # 문자열로 변환
# 입력 데이터 처리: <sos> 추가
input_data = [self.input_mapping_data['<sos>']] # <sos> 추가
i = 0
while i < len(input_str):
# 3글자 연산자와 2글자 연산자를 모두 체크
if input_str[i:i+3] in self.input_mapping_data: # 'AND', 'XOR' 체크
input_data.append(self.input_mapping_data[input_str[i:i+3]])
i += 3 # 연산자 넘어가기
elif input_str[i:i+2] in self.input_mapping_data: # 'OR' 체크
input_data.append(self.input_mapping_data[input_str[i:i+2]])
i += 2 # 연산자 넘어가기
else:
input_data.append(self.input_mapping_data[input_str[i]])
i += 1 # 숫자 넘어가기
# <eos> 추가 및 패딩
input_data.append(self.input_mapping_data['<eos>']) # <eos> 추가
input_data = input_data[:self.max_input_length] # 최대 길이 제한
input_data += [self.input_mapping_data['#']] * (self.max_input_length - len(input_data)) # 패딩
# 출력 데이터 처리: <sos>와 <eos> 추가
output_data = [self.output_mapping_data['<sos>']] + [self.output_mapping_data[x] for x in output_str] + [self.output_mapping_data['<eos>']]
output_data = output_data[:self.max_output_length] # 최대 길이 제한
output_data += [self.output_mapping_data['#']] * (self.max_output_length - len(output_data)) # 패딩
return torch.tensor(input_data, dtype=torch.long), torch.tensor(output_data, dtype=torch.long)
dataset = CustomDataset(df)
모델 구현
- Encoder : 입력 열을 임베딩하고, 논리연산식의 정보를 문맥 벡터(context vector)로 인코딩하여 반환합니다.
- Decoder : Encoder의 문맥 벡터를 입력으로 받아 논리연산 결과를 타겟으로 예측합니다.
- Seq2Seq : Encoder와 Decoder를 결합하여 전체 모델을 구성합니다.
import torch.nn as nn
import torch.optim as optim
# Encoder
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hidden_dim, n_layers, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(input_dim, emb_dim) # Embedding layer
self.lstm = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True) # RNN layer
self.dropout = nn.Dropout(dropout) # Dropout layer
# Encoder는 입력 벡터(source문장) -> 문맥 벡터(context vector)로 반환
def forward(self, src):
# src : (batch_size, seq_len)
embedded = self.embedding(src) # Convert input tokens to embeddings
# embedded : (batch_size, seq_len, emb_dim)
embedded = self.dropout(embedded) # Apply dropout to the embeddings
output, (hidden, cell) = self.lstm(embedded) # Forward pass through RNN
return hidden, cell
# Decoder
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, dropout=0.5):
super().__init__()
self.output_dim = output_dim
self.embedding = nn.Embedding(output_dim, emb_dim) # Embedding layer
self.lstm = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True) # RNN layer
self.fc_out = nn.Linear(hidden_dim, output_dim) # Output layer
self.dropout = nn.Dropout(dropout) # Dropout layer
# Decoder는 현재까지 출력된 문장에 대한 정보를 입력으로 받아 타겟 문장으로 반환
def forward(self, input, hidden, cell):
input = self.embedding(input)
input = self.dropout(input) # Apply dropout to the input
output, (hidden, cell) = self.lstm(input, (hidden, cell)) # Forward pass through RNN
prediction = self.fc_out(output) # Predict next token
return prediction, hidden, cell
# Seq2Seq LSTM Model
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg):
# Encoder를 통과한 문맥 벡터(context vector) 추출
hidden, cell = self.encoder(src)
# Decoder의 최종 결과를 담을 객체 생성
# trg : [batch_size, seq_len]
batch_size = trg.shape[0]
trg_len = trg.shape[1]
# 초기 입력 토큰 (첫 번째 입력은 일반적으로 <sos> 토큰)
input = trg[:, 0].unsqueeze(1) # (batch_size, 1)
outputs = torch.zeros(batch_size, trg_len, self.decoder.output_dim).to(self.device)
for t in range(1, trg_len): # t=1부터 시작하여 <eos>를 출력할 때까지 반복
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[:, t, :] = output.squeeze(1) # (batch_size, 1, output_dim) -> (batch_size, output_dim)
# 다음 입력을 현재 예측값으로 설정
input = output.argmax(2) # 예측 인덱스를 가져옵니다.
return outputs # [batch_size, target_length, num_output_label]
모델 초기화, 손실함수
- Encoder의 최종 Hidden State의 shape와 Decoder의 입력 shape 맞춰야 한다.
INPUT_DIM = len(custom_dataset.input_mapping_data)
OUTPUT_DIM = len(custom_dataset.output_mapping_data)
EMB_DIM = 512
HID_DIM = 512
ENC_LAYERS = 4
DEC_LAYERS = 4
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, ENC_LAYERS).to(DEVICE)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, DEC_LAYERS).to(DEVICE)
model = Seq2Seq(encoder, decoder, DEVICE).to(DEVICE)
# Define the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.003)
criterion = nn.CrossEntropyLoss()
Seq2Seq(
(encoder): Encoder(
(embedding): Embedding(16, 512)
(lstm): LSTM(512, 512, num_layers=4, batch_first=True, dropout=0.5)
(dropout): Dropout(p=0.5, inplace=False)
)
(decoder): Decoder(
(embedding): Embedding(13, 512)
(lstm): LSTM(512, 512, num_layers=4, batch_first=True, dropout=0.5)
(fc_out): Linear(in_features=512, out_features=13, bias=True)
(dropout): Dropout(p=0.5, inplace=False)
)
)
모델 훈련
def train_model(model, dataloader, optimizer, criterion, device, num_epochs, clip_value=None):
model.train() # 모델을 학습 모드로 설정
for epoch in range(num_epochs):
epoch_loss = 0
total_samples = 0 # 총 샘플 수
for inputs, targets in dataloader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad() # 기울기 초기화
outputs = model(inputs, targets) # 모델의 예측값 계산
# 모델 출력 결과와 타겟 사이의 손실 계산
outputs = outputs.view(-1, outputs.shape[-1]) # [batch_size * seq_len, output_dim]
targets = targets.view(-1) # [batch_size * seq_len]
loss = criterion(outputs, targets)
loss.backward() # 역전파
# 기울기 클리핑 (clip_value가 주어진 경우에만)
if clip_value is not None:
torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)
optimizer.step() # 최적화 수행
epoch_loss += loss.item()
# 각 에포크의 평균 손실
avg_loss = epoch_loss / len(dataloader)
print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}")
Epoch 1/100, Loss: 1.9180
Epoch 2/100, Loss: 1.6231
Epoch 3/100, Loss: 1.5526
Epoch 4/100, Loss: 1.4665
Epoch 5/100, Loss: 1.3706
Epoch 6/100, Loss: 1.3087
Epoch 7/100, Loss: 1.2604
Epoch 8/100, Loss: 1.2287
Epoch 9/100, Loss: 1.1916
Epoch 10/100, Loss: 1.1559
...
Epoch 39/100, Loss: 0.6143
Epoch 40/100, Loss: 0.6074
Epoch 41/100, Loss: 0.6047
Epoch 42/100, Loss: 0.6009
Epoch 43/100, Loss: 0.5905
Epoch 44/100, Loss: 0.5914
Epoch 45/100, Loss: 0.5840
Epoch 46/100, Loss: 0.5798
Epoch 47/100, Loss: 0.5765
Epoch 48/100, Loss: 0.5729
Epoch 49/100, Loss: 0.5741
Epoch 50/100, Loss: 0.5679
모델 테스트
- 사용자로부터 최대 2자리의 자릿수를 가진 두 수(x1, x2)를 입력
- 디버깅용 출력
- Encoder를 통해 임베딩된 결과 전체 인덱스를 출력
- Decoder를 통해 최종 문자열 토큰 출력(특수 토큰 포함ex.<sos>,<eos>, #)
- 특수 토큰을 제거한 최종 예측 값 출력
def test_model(model, input_str, input_mapping, output_mapping, max_input_length, max_output_length):
model.eval() # 평가 모드로 전환
input_data = [input_mapping['<sos>']] # <sos>로 시작
i = 0
# 문자열 입력을 인덱스로 변환
while i < len(input_str):
if input_str[i:i+3] in input_mapping:
input_data.append(input_mapping[input_str[i:i+3]])
i += 3
elif input_str[i:i+2] in input_mapping:
input_data.append(input_mapping[input_str[i:i+2]])
i += 2
else:
input_data.append(input_mapping[input_str[i]])
i += 1
input_data.append(input_mapping['<eos>']) # <eos> 추가
input_data += [input_mapping['#']] * (max_input_length - len(input_data)) # 패딩 추가
input_tensor = torch.tensor(input_data, dtype=torch.long).unsqueeze(0).to(model.device) # 배치 차원 추가
hidden, cell = model.encoder(input_tensor) # 인코더에 입력 데이터 전달
# 디코더의 초기 입력과 설정
output_indices = [output_mapping['<sos>']]
output_tensor = torch.tensor([output_indices[-1]], dtype=torch.long).unsqueeze(0).to(model.device) # 첫 <sos> 입력
for step in range(max_output_length):
output, hidden, cell = model.decoder(output_tensor, hidden, cell)
top1 = output.argmax(2).item() # 예측 결과에서 가장 높은 점수의 인덱스 선택
output_indices.append(top1)
output_tensor = torch.tensor([top1], dtype=torch.long).unsqueeze(0).to(model.device)
# 인덱스를 문자열로 변환
# 디버깅용 출력
print("Output Indices:", output_indices) # 전체 인덱스 출력
# 인덱스를 문자열로 변환 (특수 토큰 포함)
output_str = ''.join([k for idx in output_indices for k, v in output_mapping.items() if v == idx])
print("Output String tokens:", output_str) # 최종 문자열 출력
output_str = output_str.replace('<sos>', '').replace('<eos>', '').replace('#', '') # 특수 토큰 제거
print("Output String: ", output_str)
# 예측된 숫자 결과를 반환
return output_str
operators = ["AND", "OR", "XOR"]
input_x1, input_x2 = map(int, input().split())
print(input_x1, input_x2)
for i in range(3):
input_str = f"{input_x1}{operators[i]}{input_x2}"
output_str = test_model(model, input_str, custom_dataset.input_mapping_data, custom_dataset.output_mapping_data, custom_dataset.max_input_length, custom_dataset.max_output_length)
if operators[i] == 'AND':
true = input_x1 & input_x2
elif operators[i] == "OR":
true = input_x1 | input_x2
elif operators[i] == "XOR":
true = input_x1 ^ input_x2
print(f"Input: {input_str} -> True Output: {true}, Predicted Output: {output_str}")
print('==' * 20)
display(df[(df['x1'] == str(input_x1)) & (df['x2'] == str(input_x2))])
for i in range(3):
input_str = f"{input_x2}{operators[i]}{input_x1}"
output_str = test_model(model, input_str, custom_dataset.input_mapping_data, custom_dataset.output_mapping_data, custom_dataset.max_input_length, custom_dataset.max_output_length)
if operators[i] == 'AND':
true = input_x2 & input_x1
elif operators[i] == "OR":
true = input_x2 | input_x1
elif operators[i] == "XOR":
true = input_x2 ^ input_x1
print(f"Input: {input_str} -> True Output: {true}, Predicted 1Output: {output_str}")
print('==' * 20)
display(df[(df['x1'] == str(input_x2)) & (df['x2'] == str(input_x1))])
74 27
74 27
Output Indices: [10, 1, 0, 11, 12, 12]
Output String tokens: <sos>10<eos>##
Output String: 10
Input: 74AND27 -> True Output: 10, Predicted 1Output: 10
========================================
Output Indices: [10, 9, 1, 11, 12, 12]
Output String tokens: <sos>91<eos>##
Output String: 91
Input: 74OR27 -> True Output: 91, Predicted 1Output: 91
========================================
Output Indices: [10, 8, 3, 11, 12, 12]
Output String tokens: <sos>83<eos>##
Output String: 83
Input: 74XOR27 -> True Output: 81, Predicted 1Output: 83
========================================
x1 operator x2 input output
Output Indices: [10, 1, 0, 11, 12, 12]
Output String tokens: <sos>10<eos>##
Output String: 10
Input: 27AND74 -> True Output: 10, Predicted 1Output: 10
========================================
Output Indices: [10, 9, 1, 11, 12, 12]
Output String tokens: <sos>91<eos>##
Output String: 91
Input: 27OR74 -> True Output: 91, Predicted 1Output: 91
========================================
Output Indices: [10, 8, 3, 11, 12, 12]
Output String tokens: <sos>83<eos>##
Output String: 83
Input: 27XOR74 -> True Output: 81, Predicted 1Output: 83
========================================
x1 operator x2 input output
결론
논리연산(AND, OR, XOR)을 Sequence to Sequence Learning을 이용해 데이터 생성 및 처리, 학습된 데이터셋에 포함 여부까지 확인하며 구현해 보았습니다. 이후에는 Seq2Seq 이후에 나온 Transformer 구조를 학습해보고, 새로운 데이터셋에 적용해보려 합니다.
'DL > NLP' 카테고리의 다른 글
Attention Mechanism (0) | 2024.11.18 |
---|---|
Seq2Seq : Sequence to Sequence (0) | 2024.10.14 |
LSTM, GRU 간단 정리 (5) | 2024.10.08 |
RNN (0) | 2024.10.08 |