TransformerはNLP用に開発されたアーキテクチャですが、Self-Attentionが時系列データの長距離依存も捉えられます。ということで、この記事ではPyTorchでTransformerベースの株価予測モデルを実装し、LSTMとの比較もまとめます。
📘 外部参考:LSTM(Wikipedia) / Keras LSTM(公式)
📘 外部参考:Hugging Face Transformers(公式ドキュメント) / Attention Is All You Need(原論文)
📘 外部参考:PyTorch 公式 / PyTorch チュートリアル(公式)
環境構築
pip install torch torchvision yfinance pandas numpy scikit-learn
データの準備
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader
def get_stock_features(ticker, period='3y'):
"""株価データと特徴量を作成"""
df = yf.download(ticker, period=period, progress=False)
df['Return'] = df['Close'].pct_change()
df['MA5'] = df['Close'].rolling(5).mean() / df['Close'] - 1
df['MA20'] = df['Close'].rolling(20).mean() / df['Close'] - 1
delta = df['Close'].diff()
gain = delta.clip(lower=0).rolling(14).mean()
loss = (-delta.clip(upper=0)).rolling(14).mean()
df['RSI'] = (100 - 100 / (1 + gain / loss)) / 100
df['Volume_ratio'] = df['Volume'] / df['Volume'].rolling(20).mean()
df['Volatility'] = df['Return'].rolling(10).std()
# 翌日上昇を予測(2値分類)
df['Target'] = (df['Return'].shift(-1) > 0).astype(float)
df = df[['Return', 'MA5', 'MA20', 'RSI', 'Volume_ratio', 'Volatility', 'Target']].dropna()
return df
df = get_stock_features('NVDA')
print(f"データ件数: {len(df)}")
class StockDataset(Dataset):
def __init__(self, df, seq_len=30, train=True, train_ratio=0.8):
features = df.drop(columns=['Target']).values
targets = df['Target'].values
self.scaler = StandardScaler()
split = int(len(features) * train_ratio)
if train:
features = self.scaler.fit_transform(features[:split])
targets = targets[:split]
else:
features = self.scaler.fit_transform(features)
features = features[split:]
targets = targets[split:]
self.X, self.y = [], []
for i in range(seq_len, len(features)):
self.X.append(features[i-seq_len:i])
self.y.append(targets[i])
self.X = torch.FloatTensor(np.array(self.X))
self.y = torch.FloatTensor(np.array(self.y))
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
train_ds = StockDataset(df, train=True)
test_ds = StockDataset(df, train=False)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=32)
Transformerモデルの実装
class PositionalEncoding(nn.Module):
"""位置エンコーディング"""
def __init__(self, d_model, max_len=500, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1).float()
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)]
return self.dropout(x)
class StockTransformer(nn.Module):
def __init__(self, input_dim=6, d_model=64, nhead=4, num_layers=3, dropout=0.1):
super().__init__()
self.input_proj = nn.Linear(input_dim, d_model)
self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead,
dim_feedforward=256, dropout=dropout, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.classifier = nn.Sequential(
nn.Linear(d_model, 32),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, x):
x = self.input_proj(x)
x = self.pos_encoding(x)
x = self.transformer(x)
x = x[:, -1, :] # 最後のタイムステップ
return self.classifier(x).squeeze(-1)
model = StockTransformer(input_dim=6)
print(f"パラメータ数: {sum(p.numel() for p in model.parameters()):,}")
学習と評価
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
criterion = nn.BCELoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
def train_epoch(model, loader, optimizer, criterion, device):
model.train()
total_loss, correct = 0, 0
for X, y in loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
pred = model(X)
loss = criterion(pred, y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
correct += ((pred > 0.5) == y).sum().item()
return total_loss / len(loader), correct / len(loader.dataset)
def evaluate(model, loader, criterion, device):
model.eval()
total_loss, correct = 0, 0
with torch.no_grad():
for X, y in loader:
X, y = X.to(device), y.to(device)
pred = model(X)
loss = criterion(pred, y)
total_loss += loss.item()
correct += ((pred > 0.5) == y).sum().item()
return total_loss / len(loader), correct / len(loader.dataset)
best_acc = 0
for epoch in range(1, 51):
train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
val_loss, val_acc = evaluate(model, test_loader, criterion, device)
scheduler.step()
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_transformer.pth')
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss={train_loss:.4f} Acc={train_acc:.3%} | Val Loss={val_loss:.4f} Acc={val_acc:.3%}")
print(f"最高検証精度: {best_acc:.3%}")
まとめ:TransformerとLSTMの比較
実際に試してみると、TransformerはデータセットによってLSTMより良くも悪くもなります。データ量が少ないとLSTMの方が安定するケースが多い印象です。どちらも過学習しやすいので、ウォークフォワード検証で実運用前に確認するのが現実的だと思っています。

