Seq2Seq를 활용한 논리연산(AND, OR, XOR)

2024. 11. 8. 20:31·DL/NLP
목차
  1. Sequence-to-Sequence(Seq2Seq)

Sequence-to-Sequence(Seq2Seq)

일반적으로 Seq2Seq 모델은 입력 열에 대한 출력 열을 예측하는 신경망 아키텍처의 일종입니다. 주로 기계 번역, Q/A, 텍스트 요약 등에서 많이 사용됩니다. 이번 프로젝트에서는 논리연산을 입력으로, 계산 결과를 출력으로 해석하는 Seq2Seq 모델을 활용해 문제를 해결해보겠습니다.

 

데이터 생성 및 처리

  • 논리연산에 대한 데이터셋을 생성하며, 두 자리 수까지의 숫자 쌍을 사용해 연산 결과를 데이터셋에 포함하도록 구성하였습니다.
# 데이터셋 생성 함수
def generate_dataset(num_samples=10000):
    data = {
        "x1": [],
        "operator": [],
        "x2": [],
        "input" : [],
        "output": []
    }
    operators = ["AND", "OR", "XOR"]

    for _ in range(num_samples):
        value1 = random.randint(0, 99)
        operator = random.choice(operators)
        value2 = random.randint(0, 99)


        if operator == "AND":
            result = value1 & value2
        elif operator == "OR":
            result = value1 | value2
        elif operator == "XOR":
            result = value1 ^ value2

        data['x1'].append(str(value1))
        data['operator'].append(operator)
        data['x2'].append(str(value2))
        data['input'].append(f"{value1}{operator}{value2}")  # 입력 문자열
        data['output'].append(result)  # 올바른 연산 결과를 타겟으로 추가


    return pd.DataFrame(data)
  • 패딩 토큰('#')을 활용해 입출력 데이터의 길이 고정
  • seq2seq논문에 근거하에 <sos>, <eos>토큰을 활용
  • 문자열을 정수로 변환(One Hot Encoding)
    • 입력데이터 : "<sos>" + "최대 2자리의 자릿수" + "AND || OR || XOR" + "최대 2자리의 자릿수" + "<eos>" : 최대 길이 7
    • 출력데이터 : "<sos>" + "최대 3자리의 자릿수(01111111)" + "<eos>" : 최대 길이 5
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

class CustomDataset(Dataset):
    def __init__(self, df):
        self.df = df
        self.input_mapping_data = {
            '0': 0,
            '1': 1,
            '2': 2,
            '3': 3,
            '4': 4,
            '5': 5,
            '6': 6,
            '7': 7,
            '8': 8,
            '9': 9,
            'AND': 10,
            'OR' : 11,
            'XOR' : 12,
            '<sos>' : 13,  # Start of Sequence
            '<eos>': 14,  # End of Sequence
            '#': 15        # Padding token

        }
        self.output_mapping_data  = {
            '0': 0,
            '1': 1,
            '2': 2,
            '3': 3,
            '4': 4,
            '5': 5,
            '6': 6,
            '7': 7,
            '8': 8,
            '9': 9,
            '<sos>': 10,  # Start of Sequence
            '<eos>': 11,  # End of Sequence
            '#': 12       # Padding token
        }
        self.max_input_length = 7
        self.max_output_length = 5

    def __len__(self):
        return len(self.df)

    def __getitem__(self,idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        input_str = self.df.loc[idx, 'input']
        output_str = str(self.df.loc[idx, 'output'])  # 문자열로 변환

        # 입력 데이터 처리: <sos> 추가
        input_data = [self.input_mapping_data['<sos>']]  # <sos> 추가
        i = 0
        while i < len(input_str):
            # 3글자 연산자와 2글자 연산자를 모두 체크
            if input_str[i:i+3] in self.input_mapping_data:  # 'AND', 'XOR' 체크
                input_data.append(self.input_mapping_data[input_str[i:i+3]])
                i += 3  # 연산자 넘어가기
            elif input_str[i:i+2] in self.input_mapping_data:  # 'OR' 체크
                input_data.append(self.input_mapping_data[input_str[i:i+2]])
                i += 2  # 연산자 넘어가기
            else:
                input_data.append(self.input_mapping_data[input_str[i]])
                i += 1  # 숫자 넘어가기

        # <eos> 추가 및 패딩
        input_data.append(self.input_mapping_data['<eos>'])  # <eos> 추가
        input_data = input_data[:self.max_input_length]  # 최대 길이 제한
        input_data += [self.input_mapping_data['#']] * (self.max_input_length - len(input_data))  # 패딩

        # 출력 데이터 처리: <sos>와 <eos> 추가
        output_data = [self.output_mapping_data['<sos>']] + [self.output_mapping_data[x] for x in output_str] + [self.output_mapping_data['<eos>']]
        output_data = output_data[:self.max_output_length]  # 최대 길이 제한
        output_data += [self.output_mapping_data['#']] * (self.max_output_length - len(output_data))  # 패딩

        return torch.tensor(input_data, dtype=torch.long), torch.tensor(output_data, dtype=torch.long)

dataset = CustomDataset(df)

 

모델 구현

  • Encoder : 입력 열을 임베딩하고, 논리연산식의 정보를 문맥 벡터(context vector)로 인코딩하여 반환합니다.
  • Decoder : Encoder의 문맥 벡터를 입력으로 받아 논리연산 결과를 타겟으로 예측합니다.
  • Seq2Seq : Encoder와 Decoder를 결합하여 전체 모델을 구성합니다.
import torch.nn as nn
import torch.optim as optim

# Encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)  # Embedding layer
        self.lstm = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)  # RNN layer
        self.dropout = nn.Dropout(dropout)  # Dropout layer

    # Encoder는 입력 벡터(source문장) -> 문맥 벡터(context vector)로 반환
    def forward(self, src):
        # src : (batch_size, seq_len)
        embedded = self.embedding(src)  # Convert input tokens to embeddings
        # embedded : (batch_size, seq_len, emb_dim)

        embedded = self.dropout(embedded)  # Apply dropout to the embeddings
        output, (hidden, cell) = self.lstm(embedded)  # Forward pass through RNN

        return hidden, cell

# Decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, dropout=0.5):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)  # Embedding layer
        self.lstm = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)  # RNN layer
        self.fc_out = nn.Linear(hidden_dim, output_dim)  # Output layer
        self.dropout = nn.Dropout(dropout)  # Dropout layer

    # Decoder는 현재까지 출력된 문장에 대한 정보를 입력으로 받아 타겟 문장으로 반환
    def forward(self, input, hidden, cell):
        input = self.embedding(input)
        input = self.dropout(input)  # Apply dropout to the input
        output, (hidden, cell) = self.lstm(input, (hidden, cell))  # Forward pass through RNN
        prediction = self.fc_out(output)  # Predict next token
        return prediction, hidden, cell

# Seq2Seq LSTM Model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg):
        # Encoder를 통과한 문맥 벡터(context vector) 추출
        hidden, cell = self.encoder(src)

        # Decoder의 최종 결과를 담을 객체 생성
        # trg : [batch_size, seq_len]
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        # 초기 입력 토큰 (첫 번째 입력은 일반적으로 <sos> 토큰)
        input = trg[:, 0].unsqueeze(1)  # (batch_size, 1)

        outputs = torch.zeros(batch_size, trg_len, self.decoder.output_dim).to(self.device)

        for t in range(1, trg_len):  # t=1부터 시작하여 <eos>를 출력할 때까지 반복
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output.squeeze(1)  # (batch_size, 1, output_dim) -> (batch_size, output_dim)

            # 다음 입력을 현재 예측값으로 설정
            input = output.argmax(2)  # 예측 인덱스를 가져옵니다.

        return outputs  # [batch_size, target_length, num_output_label]

모델 초기화, 손실함수

  • Encoder의 최종 Hidden State의 shape와 Decoder의 입력 shape 맞춰야 한다.
INPUT_DIM = len(custom_dataset.input_mapping_data)
OUTPUT_DIM = len(custom_dataset.output_mapping_data)
EMB_DIM = 512
HID_DIM = 512
ENC_LAYERS = 4
DEC_LAYERS = 4
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

encoder = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, ENC_LAYERS).to(DEVICE)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, DEC_LAYERS).to(DEVICE)
model = Seq2Seq(encoder, decoder, DEVICE).to(DEVICE)

# Define the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.003)
criterion = nn.CrossEntropyLoss()

 

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(16, 512)
    (lstm): LSTM(512, 512, num_layers=4, batch_first=True, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(13, 512)
    (lstm): LSTM(512, 512, num_layers=4, batch_first=True, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=13, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

모델 훈련

def train_model(model, dataloader, optimizer, criterion, device, num_epochs, clip_value=None):
    model.train()  # 모델을 학습 모드로 설정

    for epoch in range(num_epochs):
        epoch_loss = 0
        total_samples = 0  # 총 샘플 수

        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()  # 기울기 초기화
            outputs = model(inputs, targets)  # 모델의 예측값 계산

            # 모델 출력 결과와 타겟 사이의 손실 계산
            outputs = outputs.view(-1, outputs.shape[-1])  # [batch_size * seq_len, output_dim]
            targets = targets.view(-1)  # [batch_size * seq_len]
            loss = criterion(outputs, targets)

            loss.backward()  # 역전파

            # 기울기 클리핑 (clip_value가 주어진 경우에만)
            if clip_value is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)

            optimizer.step()  # 최적화 수행

            epoch_loss += loss.item()

        # 각 에포크의 평균 손실
        avg_loss = epoch_loss / len(dataloader)
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}")
Epoch 1/100, Loss: 1.9180
Epoch 2/100, Loss: 1.6231
Epoch 3/100, Loss: 1.5526
Epoch 4/100, Loss: 1.4665
Epoch 5/100, Loss: 1.3706
Epoch 6/100, Loss: 1.3087
Epoch 7/100, Loss: 1.2604
Epoch 8/100, Loss: 1.2287
Epoch 9/100, Loss: 1.1916
Epoch 10/100, Loss: 1.1559
...
Epoch 39/100, Loss: 0.6143
Epoch 40/100, Loss: 0.6074
Epoch 41/100, Loss: 0.6047
Epoch 42/100, Loss: 0.6009
Epoch 43/100, Loss: 0.5905
Epoch 44/100, Loss: 0.5914
Epoch 45/100, Loss: 0.5840
Epoch 46/100, Loss: 0.5798
Epoch 47/100, Loss: 0.5765
Epoch 48/100, Loss: 0.5729
Epoch 49/100, Loss: 0.5741
Epoch 50/100, Loss: 0.5679

 

모델 테스트

  • 사용자로부터 최대 2자리의 자릿수를 가진 두 수(x1, x2)를 입력
  • 디버깅용 출력
    • Encoder를 통해 임베딩된 결과 전체 인덱스를 출력
    • Decoder를 통해 최종 문자열 토큰 출력(특수 토큰 포함ex.<sos>,<eos>, #)
    • 특수 토큰을 제거한 최종 예측 값 출력
def test_model(model, input_str, input_mapping, output_mapping, max_input_length, max_output_length):
    model.eval()  # 평가 모드로 전환
    input_data = [input_mapping['<sos>']]  # <sos>로 시작
    i = 0

    # 문자열 입력을 인덱스로 변환
    while i < len(input_str):
        if input_str[i:i+3] in input_mapping:
            input_data.append(input_mapping[input_str[i:i+3]])
            i += 3
        elif input_str[i:i+2] in input_mapping:
            input_data.append(input_mapping[input_str[i:i+2]])
            i += 2
        else:
            input_data.append(input_mapping[input_str[i]])
            i += 1

    input_data.append(input_mapping['<eos>'])  # <eos> 추가
    input_data += [input_mapping['#']] * (max_input_length - len(input_data))  # 패딩 추가
    input_tensor = torch.tensor(input_data, dtype=torch.long).unsqueeze(0).to(model.device)  # 배치 차원 추가

    hidden, cell = model.encoder(input_tensor) # 인코더에 입력 데이터 전달

    # 디코더의 초기 입력과 설정
    output_indices = [output_mapping['<sos>']]
    output_tensor = torch.tensor([output_indices[-1]], dtype=torch.long).unsqueeze(0).to(model.device)  # 첫 <sos> 입력

    for step in range(max_output_length):
        output, hidden, cell = model.decoder(output_tensor, hidden, cell)
        top1 = output.argmax(2).item()  # 예측 결과에서 가장 높은 점수의 인덱스 선택

        output_indices.append(top1)
        output_tensor = torch.tensor([top1], dtype=torch.long).unsqueeze(0).to(model.device)

    # 인덱스를 문자열로 변환
    # 디버깅용 출력
    print("Output Indices:", output_indices)  # 전체 인덱스 출력

    # 인덱스를 문자열로 변환 (특수 토큰 포함)
    output_str = ''.join([k for idx in output_indices for k, v in output_mapping.items() if v == idx])

    print("Output String tokens:", output_str)  # 최종 문자열 출력

    output_str = output_str.replace('<sos>', '').replace('<eos>', '').replace('#', '')  # 특수 토큰 제거
    print("Output String: ", output_str)
    # 예측된 숫자 결과를 반환
    return output_str
operators = ["AND", "OR", "XOR"]

input_x1, input_x2 = map(int, input().split())
print(input_x1, input_x2)

for i in range(3):
    input_str = f"{input_x1}{operators[i]}{input_x2}"
    output_str = test_model(model, input_str, custom_dataset.input_mapping_data, custom_dataset.output_mapping_data, custom_dataset.max_input_length, custom_dataset.max_output_length)

    if operators[i] == 'AND':
        true = input_x1 & input_x2
    elif operators[i] == "OR":
        true = input_x1 | input_x2
    elif operators[i] == "XOR":
        true = input_x1 ^ input_x2

    print(f"Input: {input_str} -> True Output: {true}, Predicted Output: {output_str}")
    print('==' * 20)
display(df[(df['x1'] == str(input_x1)) & (df['x2'] == str(input_x2))])

for i in range(3):
    input_str = f"{input_x2}{operators[i]}{input_x1}"
    output_str = test_model(model, input_str, custom_dataset.input_mapping_data, custom_dataset.output_mapping_data, custom_dataset.max_input_length, custom_dataset.max_output_length)

    if operators[i] == 'AND':
        true = input_x2 & input_x1
    elif operators[i] == "OR":
        true = input_x2 | input_x1
    elif operators[i] == "XOR":
        true = input_x2 ^ input_x1

    print(f"Input: {input_str} -> True Output: {true}, Predicted 1Output: {output_str}")
    print('==' * 20)
display(df[(df['x1'] == str(input_x2)) & (df['x2'] == str(input_x1))])

 

74 27
74 27
Output Indices: [10, 1, 0, 11, 12, 12]
Output String tokens: <sos>10<eos>##
Output String:  10
Input: 74AND27 -> True Output: 10, Predicted 1Output: 10
========================================
Output Indices: [10, 9, 1, 11, 12, 12]
Output String tokens: <sos>91<eos>##
Output String:  91
Input: 74OR27 -> True Output: 91, Predicted 1Output: 91
========================================
Output Indices: [10, 8, 3, 11, 12, 12]
Output String tokens: <sos>83<eos>##
Output String:  83
Input: 74XOR27 -> True Output: 81, Predicted 1Output: 83
========================================
x1	operator	x2	input	output

Output Indices: [10, 1, 0, 11, 12, 12]
Output String tokens: <sos>10<eos>##
Output String:  10
Input: 27AND74 -> True Output: 10, Predicted 1Output: 10
========================================
Output Indices: [10, 9, 1, 11, 12, 12]
Output String tokens: <sos>91<eos>##
Output String:  91
Input: 27OR74 -> True Output: 91, Predicted 1Output: 91
========================================
Output Indices: [10, 8, 3, 11, 12, 12]
Output String tokens: <sos>83<eos>##
Output String:  83
Input: 27XOR74 -> True Output: 81, Predicted 1Output: 83
========================================
x1	operator	x2	input	output

결론

논리연산(AND, OR, XOR)을 Sequence to Sequence Learning을 이용해 데이터 생성 및 처리, 학습된 데이터셋에 포함 여부까지 확인하며 구현해 보았습니다. 이후에는 Seq2Seq 이후에 나온 Transformer 구조를 학습해보고, 새로운 데이터셋에 적용해보려 합니다.

'DL > NLP' 카테고리의 다른 글

Attention Mechanism  (0) 2024.11.18
Seq2Seq : Sequence to Sequence  (0) 2024.10.14
LSTM, GRU 간단 정리  (5) 2024.10.08
RNN  (0) 2024.10.08
  1. Sequence-to-Sequence(Seq2Seq)
'DL/NLP' 카테고리의 다른 글
  • Attention Mechanism
  • Seq2Seq : Sequence to Sequence
  • LSTM, GRU 간단 정리
  • RNN
YAHO_STUDY
YAHO_STUDY
DATA&AI_study.zip
YAHO_CODEDATA&AI_study.zip
  • YAHO_STUDY
    YAHO_CODE
    YAHO_STUDY
  • 전체
    오늘
    어제
    • 분류 전체보기 (57)
      • Paper Review (0)
      • SQL (16)
      • CODING_TEST (21)
      • Time Series (0)
      • DL (20)
        • NLP (5)
        • LLM&RAG (15)
        • Recommender System (0)
      • 최적화 (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    runpod
    Lora
    PEFT
    RNN
    coding_test
    양자화
    MySQL
    pagedattention
    fine-tuning
    한 권으로 끝내는 실전 llm 파인튜닝
    DP
    prompt-tuning
    SQL
    Programmers
    hash
    qlora
    Gemma
    graph
    quantization
    boj
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
YAHO_STUDY
Seq2Seq를 활용한 논리연산(AND, OR, XOR)

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.