AI 모델 학습 및 실행 확인
- AI 모델은 학습이 성공적으로 완료되고 실제 환경에서 의도대로 잘 작동하는지 주기적으로 확인해야 함
- 모델의 성능을 최적화하고, 잠재적인 문제를 조기에 발견하여 해결하는 데 필수적
1. AI 모델 학습 확인 방법
- 모델이 ‘잘 배우고 있는지’ 그리고 ‘학습이 성공적으로 끝났는지’를 확인하는 과정
1.1 학습 과정 모니터링
학습이 진행되는 동안 실시간으로 모델의 상태를 파악하는 것이 중요함
- 손실(Loss) 값 변화 확인
- 손실 값: 모델의 예측과 실제 값 사이의 오차를 나타냄
- 학습이 진행되면서 이 값이 점진적으로 감소하는지 확인해야 함
- 만약 손실 값이 감소하지 않거나 오히려 증가한다면, 학습률(learning rate) 설정이나 모델 구조 등 문제가 있을 수 있음
- 훈련(training) 손실과 검증(validation) 손실을 함께 보며 과소적합(Underfitting) 또는 과대적합(Overfitting) 징후 파악
- 과소적합: 훈련 손실과 검증 손실 모두 높게 유지되는 경우 (모델이 충분히 학습되지 않음)
- 과대적합: 훈련 손실은 낮아지지만 검증 손실은 증가하는 경우 (모델이 훈련 데이터에 너무 맞춰져 새로운 데이터에 취약함)
- 과대적합 또는 과적합으로 부름
- 평가 지표(Metrics) 변화 확인
- 모델의 목적에 맞는 평가 지표가 점진적으로 개선되는지 확인
- 평가지표
- 분류 모델: 정확도(Accuracy), 정밀도(Precision), 재현율(Recall), F1-Score 등
- 회귀 모델: RMSE, MAE 등
- 평가지표
- 손실 값과 마찬가지로 훈련 세트와 검증 세트에서의 지표를 함께 비교하며 일반화 성능을 평가
- 모델의 목적에 맞는 평가 지표가 점진적으로 개선되는지 확인
- 학습 곡선 시각화
- TensorBoard, Matplotlib 등의 도구 사용
- 손실 값과 평가 지표의 변화를 그래프로 시각화하면 학습 과정을 한눈에 파악하기 용이함
- 시각화를 통해 최적의 학습 중단 시점(Early Stopping)을 결정하거나, 모델 개선 방향을 수립할 수 있음
1.2 학습 결과 분석
학습이 완료된 후 모델의 최종 상태를 확인하는 과정
- 테스트 데이터셋으로 최종 평가
- 학습 및 검증 과정에서 사용되지 않은, 모델이 전혀 보지 못했던 테스트 데이터셋을 사용하여 최종 성능을 평가
- 모델의 실제 일반화 능력을 가장 객관적으로 보여줌
- 여기서 얻은 정확도, 정밀도 등의 지표가 목표치를 충족하는지 확인
- 오류 분석 (Error Analysis)
- 테스트 데이터셋에서 모델이 잘못 예측한 샘플들을 분석
- 어떤 유형의 데이터에서 오류가 발생하는지 파악
- 모델의 약점을 보완하거나 추가 학습 데이터 확보에 활용 가능
- 모델 저장 및 버전 관리
- 성공적으로 학습된 모델은 추후 다시 사용할 수 있도록 가중치와 구조를 ‘.h5’, ‘.pth’, ‘.tf’ 등의 형식으로 저장
- 최근 버전의 Tensorflow는 ‘.keras’를 사용할 것을 권장함
- ‘.pth’는 주로 PyTorch에서 사용함
- MLflow, Weights & Biases와 같은 실험 관리 도구 사용
- 모델 버전, 학습 파라미터, 성능 지표 등을 체계적으로 기록하고 관리하면 지속적인 작업 및 개선에 도움이 됨
- 성공적으로 학습된 모델은 추후 다시 사용할 수 있도록 가중치와 구조를 ‘.h5’, ‘.pth’, ‘.tf’ 등의 형식으로 저장
2. AI 모델 실행(추론) 확인 방법
- 학습된 모델이 실제 서비스 환경에서 ‘의도대로 잘 작동하는지’를 확인하는 과정
2.1 실시간 예측 확인
실제 운영 환경에서 새로운 입력에 대한 모델의 동작 점검
- 샘플 데이터 추론 결과 검토
- 새로운 실제 데이터를 모델에 입력하고, 모델이 생성하는 출력(예측 값)을 육안 또는 스크립트로 주기적으로 확인
- 예측 결과가 논리적으로 맞는지, 기대하는 형식으로 나오는지 등을 검토
- 임계값(Threshold) 설정 및 테스트
- 분류 모델의 경우 예측 확률에 대한 임계값 설정이 필요함
- 다양한 임계값에 대해 모델의 행동이 어떻게 변하는지 확인하고 최적의 임계값을 결정
2.2 성능(Performance) 모니터링
모델이 효율적이고 안정적으로 운영되는지 확인
- 추론 시간(Latency) 측정
- 하나의 입력 데이터를 처리하여 예측을 생성하는 데 걸리는 시간을 측정
- 사용자 경험과 직접적으로 연결되므로 서비스 SLA(Service Level Agreement)를 충족하는지 확인해야 함
- 자원 사용량(Resource Utilization) 모니터링
- 모델 실행 시 CPU, GPU, 메모리 등의 자원 사용량 모니터링
- 과도한 자원 사용은 비용 증가나 시스템 불안정으로 이어질 수 있음
- 로컬 환경에서 모델을 구동하는 경우, 제한된 자원에서 얼마나 효율적으로 작동하는지 꾸준히 확인하고 최적화해야 함
- 처리량(Throughput) 측정
- 주어진 시간 동안 모델이 처리할 수 있는 입력 데이터의 양 측정
- 서비스의 부하를 감당할 수 있는지 평가하는 중요 지표
2.3 모델 / 데이터 드리프트 모니터링
시간이 지남에 따라 모델의 성능이 저하될 수 있는 현상을 감지
- 모델 드리프트(Model Drift)
- 시간이 지남에 따라 모델의 예측 성능이 점진적으로 저하되는 현상
- 데이터 드리프트의 결과일 수도 있고, 외부 환경 변화 때문일 수도 있음
- 지속적인 성능 지표 모니터링과 주기적인 재검증을 통해 모델 드리프트를 감지
- 필요시 모델을 재학습하거나 업데이트해야 함
- 데이터 드리프트(Data Drift)
- 실제 서비스에 입력되는 데이터의 분포가 모델 학습 시 사용했던 데이터의 분포와 달라지는 현상
- 예: 신조어 등장, 새로운 트렌드의 콘텐츠 등
- 모델 성능 저하의 주요 원인이 됨
- 주기적으로 입력 데이터의 특성을 분석하여 변화를 감지해야 함
- 실제 서비스에 입력되는 데이터의 분포가 모델 학습 시 사용했던 데이터의 분포와 달라지는 현상
2.4 오류 및 로그 관리
문제가 발생했을 때 신속하게 원인을 파악하기 위함
- 로깅 시스템 구축
- 모델의 모든 예측 결과, 오류 발생 시점, 관련 입력 데이터 등을 상세하게 로그로 기록하는 시스템을 구축
- ELK Stack(Elasticsearch, Logstash, Kibana) 같은 도구를 활용하여 로그를 효과적으로 수집, 저장, 분석할 수 있음
- 알림 시스템 (Alert System)
- 모델의 성능 지표가 특정 임계값 이하로 떨어지거나,
- 자원 사용량이 비정상적으로 증가하는 경우,
- 관리자에게 자동으로 알림을 보내는 시스템 구축
3. AI 모델 학습 및 실행 실습
3.1 MNIST 데이터셋 학습
3.1.1 Tensorflow 기반 DNN 모델
- 모델 학습 및 평가
import tensorflow as tf
tf.random.set_seed(6)
# MNIST 데이터셋 로드
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# 데이터 전처리
x_train = x_train.reshape(-1, 28*28).astype('float32') / 255.0
x_test = x_test.reshape(-1, 28*28).astype('float32') / 255.0
# 모델 정의
model = tf.keras.Sequential([
tf.keras.layers.Dense(1000, activation='relu'),
tf.keras.layers.Dense(1000, activation='relu'),
tf.keras.layers.Dense(10)
])
# 모델 컴파일
model.compile(
optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# 모델 훈련
model.fit(
x_train, y_train,
epochs=5,
batch_size=128,
verbose=1
)
# 모델 평가
_, accuracy = model.evaluate(x_test, y_test)
print(f'Accuracy: {accuracy:.4f}')
# 모델 저장
model.save('model_mnist_dnn_tensorflow.keras')
print('Training finished')
# 학습된 모델을 이용한 실제 예측 확인(시각화)
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
num_samples = 5
random_indices = np.random.randint(0, len(x_test), num_samples)
model = tf.keras.models.load_model('model_mnist_dnn_tensorflow.keras')
plt.figure(figsize=(25, 3))
for i, idx in enumerate(random_indices):
# 이미지와 실제 레이블
image = x_test[idx]
true_label = y_test[idx]
# 예측
pred = model.predict(np.expand_dims(image, axis=0))[0]
predicted_label = np.argmax(pred)
confidence = np.max(pred) * 100
# 이미지 시각화
plt.subplot(1, num_samples, i+1)
plt.imshow(image.reshape(28, 28), cmap='gray')
# 예측이 맞았는지 색상으로 표시 (초록: 맞음, 빨강: 틀림)
title_color = 'green' if predicted_label == true_label else 'red'
plt.title(f'Pred: {predicted_label}\nReal: {true_label}', color=title_color)
plt.axis('off')
3.1.2 PyTorch 기반 DNN 모델
- 모델 학습 및 평가
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
# 시드 설정
torch.manual_seed(6)
# 하이퍼파라미터 설정
batch_size = 128
learning_rate = 0.001
num_epochs = 5
# 데이터셋 전처리 및 로드
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)) # MNIST 평균과 표준편차
])
# 학습 및 테스트 데이터 로드
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# DNN 모델 정의
class DNN(nn.Module):
def __init__(self):
super(DNN, self).__init__()
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(28 * 28, 1000)
self.relu1 = nn.ReLU()
self.fc2 = nn.Linear(1000, 1000)
self.relu2 = nn.ReLU()
self.fc3 = nn.Linear(1000, 10)
def forward(self, x):
x = self.flatten(x)
x = self.fc1(x)
x = self.relu1(x)
x = self.fc2(x)
x = self.relu2(x)
x = self.fc3(x)
return x
# 디바이스 설정 (GPU 사용 가능 시)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'사용 중인 디바이스: {device}')
# 모델 초기화
model = DNN().to(device)
# 손실 함수와 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 모델 학습
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for i, (images, labels) in enumerate(train_loader):
images, labels = images.to(device), labels.to(device)
# 순전파, 역전파, 최적화
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if (i+1) % 100 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {running_loss/100:.4f}')
running_loss = 0.0
# 모델 평가
model.eval()
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(f'테스트 정확도: {accuracy:.2f}%')
# 모델 저장
torch.save(model.state_dict(), 'model_mnist_dnn_pytorch.pth')
print('모델이 저장되었습니다!')
# 학습된 모델을 이용한 실제 예측 확인(시각화)
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# 테스트셋에서 무작위로 샘플 선택
dataiter = iter(test_loader)
images, labels = next(dataiter)
# 샘플 이미지에 대한 예측
images = images.to(device)
outputs = model(images)
_, predicted = torch.max(outputs, 1)
# 첫 15개 이미지와 예측 결과 시각화
plt.figure(figsize=(15, 5))
for i in range(15):
plt.subplot(3, 5, i+1)
plt.imshow(images[i].cpu().squeeze().numpy(), cmap='gray')
# 예측이 맞았는지 색상으로 표시
title_color = 'green' if predicted[i] == labels[i] else 'red'
plt.title(f'Pred: {predicted[i]}\nReal: {labels[i]}', color=title_color)
plt.axis('off')
3.1.3 Tensorflow 기반 CNN 모델
import tensorflow as tf
from tensorflow.keras import datasets, layers, models
import matplotlib.pyplot as plt
import numpy as np
import warnings
warnings.filterwarnings('ignore')
# 시드 설정
tf.random.set_seed(6)
# MNIST 데이터셋 로드
(x_train, y_train), (x_test, y_test) = datasets.mnist.load_data()
# 데이터 전처리
# CNN 모델을 위해 이미지 형식으로 reshape (샘플 수, 높이, 너비, 채널)
x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0
x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0
# 레이블을 원-핫 인코딩 (선택사항)
# 원-핫 인코딩을 수행했다면 모델 컴파일 시 손실 함수로 CategoricalCrossentropy를 사용할 것
# 원-핫 인코딩을 수행하지 않았다면 모델 컴파일 시 손실 함수로 SparseCategoricalCrossentropy를 사용할 것
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
# CNN 모델 구축
model = models.Sequential([
# 첫 번째 합성곱 레이어
layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
layers.MaxPooling2D((2, 2)),
# 두 번째 합성곱 레이어
layers.Conv2D(64, (3, 3), activation='relu'),
layers.MaxPooling2D((2, 2)),
# 세 번째 합성곱 레이어
layers.Conv2D(64, (3, 3), activation='relu'),
# 완전 연결 레이어를 위한 Flatten
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.Dense(10) # 10개 클래스에 대한 출력
])
# 모델 컴파일
model.compile(
optimizer='adam',
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
# loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# 모델 학습
history = model.fit(
x_train, y_train,
epochs=5,
batch_size=128,
validation_split=0.1,
verbose=1
)
# 모델 평가
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f'테스트 정확도: {test_acc:.4f}')
plt.figure(figsize=(12, 4))
# 정확도(Accuracy) 그래프
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train')
plt.plot(history.history['val_accuracy'], label='Validation')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
# 손실(Loss) 그래프
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train')
plt.plot(history.history['val_loss'], label='Validation')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# 모델 저장
model.save('model_mnist_cnn_tensorflow.keras')
print('모델이 저장되었습니다!')
# 학습된 모델을 이용한 실제 예측 확인(시각화)
num_samples = 5
predictions = model.predict(x_test[:num_samples])
predictions = np.argmax(predictions, axis=1)
plt.figure(figsize=(25, 3))
for i in range(num_samples):
plt.subplot(1, num_samples, i+1)
plt.imshow(x_test[i].reshape(28, 28), cmap='gray')
plt.title(f'Pred: {predictions[i]}\nReal: {y_test[i]}')
plt.axis('off')
3.1.4 PyTorch 기반 CNN 모델
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
# GPU 사용 가능 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'사용 중인 디바이스: {device}')
# 하이퍼파라미터 설정
batch_size = 128
learning_rate = 0.001
num_epochs = 5
# 데이터셋 전처리 및 로드
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)) # MNIST 데이터셋의 평균과 표준편차
])
# 학습 데이터 로드
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transform,
download=True
)
# 테스트 데이터 로드
test_dataset = torchvision.datasets.MNIST(
root='./data',
train=False,
transform=transform,
download=True
)
# 데이터 로더 설정
train_loader = DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=True
)
test_loader = DataLoader(
dataset=test_dataset,
batch_size=batch_size,
shuffle=False
)
# CNN 모델 정의
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
# 첫 번째 합성곱 레이어
self.conv1 = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2) # 출력 크기: 14x14
)
# 두 번째 합성곱 레이어
self.conv2 = nn.Sequential(
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2) # 출력 크기: 7x7
)
# 완전 연결 레이어
self.fc1 = nn.Linear(64 * 7 * 7, 128)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
# 합성곱 레이어 통과
x = self.conv1(x)
x = self.conv2(x)
# 텐서 평탄화
x = x.view(x.size(0), -1)
# 완전 연결 레이어 통과
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# 모델 초기화
model = CNN().to(device)
# 손실 함수와 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 모델 학습
total_step = len(train_loader)
for epoch in range(num_epochs):
model.train() # 학습 모드 설정
for i, (images, labels) in enumerate(train_loader):
# GPU로 데이터 이동
images = images.to(device)
labels = labels.to(device)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i+1) % 100 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_step}], Loss: {loss.item():.4f}')
# 모델 평가
model.eval() # 평가 모드 설정
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f'테스트 정확도: {100 * correct / total:.2f}%')
# 모델 저장
torch.save(model.state_dict(), 'model_mnist_cnn_pytorch.pth')
print('모델이 저장되었습니다!')
# 학습된 모델을 이용한 실제 예측 확인(시각화)
# 테스트셋에서 무작위로 샘플 선택
dataiter = iter(test_loader)
images, labels = next(dataiter)
# 샘플 이미지에 대한 예측
images = images.to(device)
outputs = model(images)
_, predicted = torch.max(outputs, 1)
# 첫 15개 이미지와 예측 결과 시각화
plt.figure(figsize=(15, 5))
for i in range(15):
plt.subplot(3, 5, i+1)
plt.imshow(images[i].cpu().squeeze().numpy(), cmap='gray')
# 예측이 맞았는지 색상으로 표시
title_color = 'green' if predicted[i] == labels[i] else 'red'
plt.title(f'Pred: {predicted[i]}\nReal: {labels[i]}', color=title_color)
plt.axis('off')
3.2 HouseSales 데이터셋 학습
3.2.1 Tensorflow 기반 DNN 모델
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
def load_house_data(file_path='https://raw.githubusercontent.com/SkyLectures/LectureMaterials/refs/heads/main/datasets/S03-10-02-02_01-kc_house_data.csv'):
# 데이터 읽기
df = pd.read_csv(file_path)
# 필요없는 컬럼 제거
df = df.drop(['id', 'date'], axis=1)
# 결측치 처리
df = df.dropna()
# 이상치 확인 및 제거 (선택적)
# 예: 가격이 너무 높거나 낮은 경우 제거
q_low = df['price'].quantile(0.01)
q_high = df['price'].quantile(0.99)
df = df[(df['price'] > q_low) & (df['price'] < q_high)]
# 특성(X)과 타겟(y) 분리
X = df.drop('price', axis=1)
y = df['price']
# 학습 및 테스트 데이터 분리
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=6)
# 특성 스케일링 (표준화)
scaler_X = StandardScaler()
X_train = scaler_X.fit_transform(X_train)
X_test = scaler_X.transform(X_test)
# 타겟 변수도 스케일링 (회귀 문제에서는 타겟도 스케일링하는 것이 중요)
scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train.values.reshape(-1, 1)).flatten()
y_test = scaler_y.transform(y_test.values.reshape(-1, 1)).flatten()
# NaN 값이 있는지 확인
print("X_train NaN 개수:", np.isnan(X_train).sum())
print("y_train NaN 개수:", np.isnan(y_train).sum())
return X_train, X_test, y_train, y_test, scaler_y
def training(save_path='model_house_sales_dnn_tensorflow.keras'):
# 데이터 로드
X_train, X_test, y_train, y_test, scaler_y = load_house_data()
# 입력 특성 수 확인
input_dim = X_train.shape[1]
# 모델 정의 (더 안정적인 구조)
model = tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(input_dim,),
kernel_initializer='he_normal'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(128, activation='relu',
kernel_initializer='he_normal'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(64, activation='relu',
kernel_initializer='he_normal'),
tf.keras.layers.Dense(1) # 회귀 문제이므로 출력층은 하나의 노드
])
# 모델 컴파일 (옵티마이저 변경 및 학습률 감소)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0) # 그래디언트 클리핑 추가
model.compile(
optimizer=optimizer,
loss='mse', # Mean Squared Error
metrics=['mae'] # Mean Absolute Error
)
# 콜백 추가 (학습이 불안정할 때 조기 종료)
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor='val_loss', patience=10, restore_best_weights=True
)
# 모델 훈련
history = model.fit(
X_train, y_train,
epochs=50, # 더 많은 에폭 설정
batch_size=32, # 배치 사이즈 감소
verbose=1,
validation_split=0.1,
callbacks=[early_stopping]
)
# 모델 평가
loss, mae = model.evaluate(X_test, y_test)
print(f'테스트 손실(스케일링된 데이터): {loss:.2f}')
print(f'테스트 MAE(스케일링된 데이터): {mae:.2f}')
# 원래 스케일로 예측 결과 변환하여 평가
y_pred = model.predict(X_test)
y_pred_original = scaler_y.inverse_transform(y_pred)
y_test_original = scaler_y.inverse_transform(y_test.reshape(-1, 1))
# 원래 스케일에서의 MAE 계산
mae_original = np.mean(np.abs(y_pred_original - y_test_original))
print(f'테스트 MAE(원래 스케일): ${mae_original:.2f}')
# 모델 저장
model.save(save_path)
print('Training finished')
return model, scaler_y
# 직접 실행
model, scaler_y = training()
# 예측 예시
X_train, X_test, y_train, y_test, _ = load_house_data()
# 테스트 데이터 중 일부 샘플에 대해 예측
sample_count = 5
sample_indices = np.random.randint(0, len(X_test), sample_count)
sample_X = X_test[sample_indices]
sample_y = y_test[sample_indices]
predictions = model.predict(sample_X)
# 원래 스케일로 변환
sample_y_original = scaler_y.inverse_transform(sample_y.reshape(-1, 1))
predictions_original = scaler_y.inverse_transform(predictions)
# 결과 출력
print("\n예측 결과 비교:")
print("실제 가격\t\t예측 가격\t\t차이")
print("-" * 60)
for i in range(sample_count):
actual = sample_y_original[i][0]
predicted = predictions_original[i][0]
diff = abs(actual - predicted)
print(f"${actual:.2f}\t\t${predicted:.2f}\t\t${diff:.2f}")
3.2.2 PyTorch 기반 DNN 모델
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 시드 설정
torch.manual_seed(6)
# 하이퍼파라미터 설정
batch_size = 32
learning_rate = 0.001
num_epochs = 5
# 데이터셋 클래스 정의
class HouseSalesDataset(Dataset):
def __init__(self, features, targets):
self.features = torch.tensor(features, dtype=torch.float32)
self.targets = torch.tensor(targets, dtype=torch.float32).reshape(-1, 1)
def __len__(self):
return len(self.features)
def __getitem__(self, idx):
return self.features[idx], self.targets[idx]
# 데이터 로드 및 전처리 함수
def load_house_data(file_path='https://raw.githubusercontent.com/SkyLectures/LectureMaterials/refs/heads/main/datasets/S03-10-02-02_01-kc_house_data.csv'):
# 데이터 읽기
df = pd.read_csv(file_path)
# 필요없는 컬럼 제거
df = df.drop(['id', 'date'], axis=1)
# 결측치 처리
df = df.dropna()
# 이상치 제거 (가격 기준 상하위 1% 제거)
q_low = df['price'].quantile(0.01)
q_high = df['price'].quantile(0.99)
df = df[(df['price'] > q_low) & (df['price'] < q_high)]
# 특성(X)과 타겟(y) 분리
X = df.drop('price', axis=1)
y = df['price']
# 학습 및 테스트 데이터 분리
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=6)
# 특성 스케일링 (표준화)
scaler_X = StandardScaler()
X_train = scaler_X.fit_transform(X_train)
X_test = scaler_X.transform(X_test)
# 타겟 변수도 스케일링 (회귀 문제에서는 타겟도 스케일링하는 것이 중요)
scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train.values.reshape(-1, 1)).flatten()
y_test = scaler_y.transform(y_test.values.reshape(-1, 1)).flatten()
return X_train, X_test, y_train, y_test, scaler_y
# 신경망 모델 정의
class HousePriceModel(nn.Module):
def __init__(self, input_dim):
super(HousePriceModel, self).__init__()
self.layer1 = nn.Linear(input_dim, 256)
self.batch_norm1 = nn.BatchNorm1d(256)
self.layer2 = nn.Linear(256, 128)
self.batch_norm2 = nn.BatchNorm1d(128)
self.layer3 = nn.Linear(128, 64)
self.batch_norm3 = nn.BatchNorm1d(64)
self.layer4 = nn.Linear(64, 1)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.batch_norm1(self.layer1(x)))
x = self.relu(self.batch_norm2(self.layer2(x)))
x = self.relu(self.batch_norm3(self.layer3(x)))
x = self.layer4(x)
return x
def training():
# 데이터 로드
X_train, X_test, y_train, y_test, scaler_y = load_house_data()
# 디바이스 설정 (GPU 사용 가능 시)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'사용 중인 디바이스: {device}')
# 데이터셋 및 데이터로더 생성
train_dataset = HouseSalesDataset(X_train, y_train)
test_dataset = HouseSalesDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
# 모델 초기화
input_dim = X_train.shape[1]
model = HousePriceModel(input_dim).to(device)
# 손실 함수와 옵티마이저 정의
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 학습 기록 저장용
train_losses = []
# 모델 학습
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for features, targets in train_loader:
features, targets = features.to(device), targets.to(device)
# Forward pass
outputs = model(features)
loss = criterion(outputs, targets)
# Backward pass and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_train_loss = running_loss / len(train_loader)
train_losses.append(avg_train_loss)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_train_loss:.4f}')
# 모델 평가
model.eval()
with torch.no_grad():
test_loss = 0.0
mae = 0.0
for features, targets in test_loader:
features, targets = features.to(device), targets.to(device)
outputs = model(features)
# MSE 계산
test_loss += criterion(outputs, targets).item()
# MAE 계산
mae += torch.mean(torch.abs(outputs - targets)).item()
avg_test_loss = test_loss / len(test_loader)
avg_mae = mae / len(test_loader)
print(f'테스트 손실(스케일링된 데이터): {avg_test_loss:.4f}')
print(f'테스트 MAE(스케일링된 데이터): {avg_mae:.4f}')
# 원래 스케일로 예측 결과 변환하여 평가
model.eval()
all_targets = []
all_predictions = []
with torch.no_grad():
for features, targets in test_loader:
features, targets = features.to(device), targets.to(device)
outputs = model(features)
all_targets.append(targets.cpu().numpy())
all_predictions.append(outputs.cpu().numpy())
# 예측 결과와 실제 값을 numpy 배열로 변환
all_targets = np.vstack(all_targets)
all_predictions = np.vstack(all_predictions)
# 원래 스케일로 변환
all_targets_original = scaler_y.inverse_transform(all_targets)
all_predictions_original = scaler_y.inverse_transform(all_predictions)
# 원래 스케일에서의 MAE 계산
mae_original = np.mean(np.abs(all_predictions_original - all_targets_original))
print(f'테스트 MAE(원래 스케일): ${mae_original:.2f}')
# 학습 곡선 시각화
plt.figure(figsize=(10, 6))
plt.plot(range(1, num_epochs + 1), train_losses, marker='o')
plt.title('Learning Loss Curve')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.show()
# 예측 결과 시각화
plt.figure(figsize=(10, 6))
plt.scatter(all_targets_original, all_predictions_original, alpha=0.5)
plt.plot([all_targets_original.min(), all_targets_original.max()],
[all_targets_original.min(), all_targets_original.max()], 'r--')
plt.xlabel('Real Price')
plt.ylabel('Predicted Price')
plt.title('Real Price vs Predicted Price')
plt.grid(True)
plt.show()
# 모델 저장
torch.save(model.state_dict(), 'model_house_sales_dnn_pytorch.pth')
print('모델이 저장되었습니다!')
return model, scaler_y
model, scaler_y = training()
# 예측 예시 (몇 가지 샘플에 대한 예측)
def predict_sample(model, scaler_y, num_samples=5):
# 데이터 로드
X_train, X_test, y_train, y_test, _ = load_house_data()
# 샘플 선택
indices = np.random.randint(0, len(X_test), num_samples)
sample_X = X_test[indices]
sample_y = y_test[indices]
# 텐서로 변환
sample_X_tensor = torch.tensor(sample_X, dtype=torch.float32).to(device)
# 예측
model.eval()
with torch.no_grad():
predictions = model(sample_X_tensor)
# 원래 스케일로 변환
sample_y_original = scaler_y.inverse_transform(sample_y.reshape(-1, 1))
predictions_original = scaler_y.inverse_transform(predictions.cpu().numpy())
# 결과 출력
print("\n예측 결과 비교:")
print("실제 가격\t\t예측 가격\t\t차이")
print("-" * 60)
for i in range(num_samples):
actual = sample_y_original[i][0]
predicted = predictions_original[i][0]
diff = abs(actual - predicted)
print(f"${actual:.2f}\t\t${predicted:.2f}\t\t${diff:.2f}")
# 샘플 예측 실행
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
predict_sample(model, scaler_y)