Pythonで株価予測Transformerモデルを実装する方法|PyTorch・Self-Attention

AI×自動売買

TransformerはNLP用に開発されたアーキテクチャですが、Self-Attentionが時系列データの長距離依存も捉えられます。ということで、この記事ではPyTorchでTransformerベースの株価予測モデルを実装し、LSTMとの比較もまとめます。

📘 外部参考LSTM(Wikipedia)Keras LSTM(公式)

📘 外部参考Hugging Face Transformers(公式ドキュメント)Attention Is All You Need(原論文)

📘 外部参考PyTorch 公式PyTorch チュートリアル(公式)

環境構築

pip install torch torchvision yfinance pandas numpy scikit-learn

データの準備

import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader

def get_stock_features(ticker, period='3y'):
    """株価データと特徴量を作成"""
    df = yf.download(ticker, period=period, progress=False)
    
    df['Return'] = df['Close'].pct_change()
    df['MA5'] = df['Close'].rolling(5).mean() / df['Close'] - 1
    df['MA20'] = df['Close'].rolling(20).mean() / df['Close'] - 1
    
    delta = df['Close'].diff()
    gain = delta.clip(lower=0).rolling(14).mean()
    loss = (-delta.clip(upper=0)).rolling(14).mean()
    df['RSI'] = (100 - 100 / (1 + gain / loss)) / 100
    
    df['Volume_ratio'] = df['Volume'] / df['Volume'].rolling(20).mean()
    df['Volatility'] = df['Return'].rolling(10).std()
    
    # 翌日上昇を予測(2値分類)
    df['Target'] = (df['Return'].shift(-1) > 0).astype(float)
    
    df = df[['Return', 'MA5', 'MA20', 'RSI', 'Volume_ratio', 'Volatility', 'Target']].dropna()
    return df

df = get_stock_features('NVDA')
print(f"データ件数: {len(df)}")

class StockDataset(Dataset):
    def __init__(self, df, seq_len=30, train=True, train_ratio=0.8):
        features = df.drop(columns=['Target']).values
        targets = df['Target'].values
        
        self.scaler = StandardScaler()
        split = int(len(features) * train_ratio)
        
        if train:
            features = self.scaler.fit_transform(features[:split])
            targets = targets[:split]
        else:
            features = self.scaler.fit_transform(features)
            features = features[split:]
            targets = targets[split:]
        
        self.X, self.y = [], []
        for i in range(seq_len, len(features)):
            self.X.append(features[i-seq_len:i])
            self.y.append(targets[i])
        
        self.X = torch.FloatTensor(np.array(self.X))
        self.y = torch.FloatTensor(np.array(self.y))
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_ds = StockDataset(df, train=True)
test_ds = StockDataset(df, train=False)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=32)

Transformerモデルの実装

class PositionalEncoding(nn.Module):
    """位置エンコーディング"""
    def __init__(self, d_model, max_len=500, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class StockTransformer(nn.Module):
    def __init__(self, input_dim=6, d_model=64, nhead=4, num_layers=3, dropout=0.1):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead,
            dim_feedforward=256, dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        self.classifier = nn.Sequential(
            nn.Linear(d_model, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.input_proj(x)
        x = self.pos_encoding(x)
        x = self.transformer(x)
        x = x[:, -1, :]  # 最後のタイムステップ
        return self.classifier(x).squeeze(-1)

model = StockTransformer(input_dim=6)
print(f"パラメータ数: {sum(p.numel() for p in model.parameters()):,}")

学習と評価

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
criterion = nn.BCELoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)

def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss, correct = 0, 0
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        pred = model(X)
        loss = criterion(pred, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
        correct += ((pred > 0.5) == y).sum().item()
    return total_loss / len(loader), correct / len(loader.dataset)

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = criterion(pred, y)
            total_loss += loss.item()
            correct += ((pred > 0.5) == y).sum().item()
    return total_loss / len(loader), correct / len(loader.dataset)

best_acc = 0
for epoch in range(1, 51):
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, test_loader, criterion, device)
    scheduler.step()
    
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), 'best_transformer.pth')
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch}: Train Loss={train_loss:.4f} Acc={train_acc:.3%} | Val Loss={val_loss:.4f} Acc={val_acc:.3%}")

print(f"最高検証精度: {best_acc:.3%}")

まとめ:TransformerとLSTMの比較

実際に試してみると、TransformerはデータセットによってLSTMより良くも悪くもなります。データ量が少ないとLSTMの方が安定するケースが多い印象です。どちらも過学習しやすいので、ウォークフォワード検証で実運用前に確認するのが現実的だと思っています。

タイトルとURLをコピーしました