※本記事のコードや情報は執筆時点の仕様に基づいています。投資は自己責任であり、必ずデモ環境や少額資金でテストした上で運用してください。
「AIで株価を予想すれば、必ず儲かる」という触れ込みで売られているシステムは数多くありますが、実際のところ、ほぼ100%が過学習(カーブフィッティング)という陥穽に引っかかっています。過去データに完璧に適合したモデルは、未来の市場では通用しないという厳しい現実があります。
しかし「AIが株価予想に全く役に立たない」というわけではありません。正しく構築し、正しく検証されたAIモデルは、単純なテクニカル指標よりも高い精度で、短期~中期の価格方向性を予測する可能性があります。その鍵は、モデルの構築方法、検証方法、そして実装時の謙虚さにあります。
本記事では、yfinanceで取得した実データを使い、ニューラルネットワークやスキットラーン(scikit-learn)の機械学習モデルを構築し、厳格な訓練・検証・テストの3段階分割を実装します。さらに、過学習を検出する方法、モデル精度の正当な評価方法、そして実際の売買シグナルへの活用方法までを、すべてコード付きで解説します。
機械学習による株価予想の限界と現実的な期待値
AI株価予想の話を始める前に、その限界を理解することが極めて重要です。過度な期待が失敗につながります。
なぜAIは株価を「完全に」予想できないのか
株価の動きは、以下の複数の要因から成り立っています。
- テクニカル要因: 過去の価格パターン、移動平均線、RSIなど(機械学習が捉えやすい)
- ファンダメンタル要因: 企業業績、金利、GDP成長率など(データとしては遅延が大きい)
- 市場心理要因: 投資家の感情、リスク回避行動、群集心理(定量化が困難)
- ランダムノイズ: 突発的なニュース、テロ、戦争など(予測不可能)
📘 外部参考:Moving Average(Investopedia)
📘 外部参考:移動平均(Wikipedia 日本語)
機械学習モデルは、過去のデータから統計的なパターンを学習します。しかし、株式市場には「構造的な転換点」(相場の大きな転換)が存在し、過去のパターンが未来で通用する保証がないのです。
現実的な期待値:「精度50~55%」が目標
コイン投げでランダムに買い・売りを決めた場合、勝率は50%です。一方、優れた機械学習モデルが実現できる精度は、せいぜい51~55%程度です。
これは一見、わずかな差に見えます。しかし、複利の力と取引回数を考えると、この5%の優位性が長期的には大きな利益を生み出す可能性があります。
| 勝率 | 年間取引数 | 1回あたりリターン | 期待値 |
|---|---|---|---|
| 50%(ランダム) | 100回 | +1% / -1% | 0%(コストが赤字) |
| 52%(簡単なAI) | 100回 | +1% / -1% | +2%(手数料考慮後) |
| 55%(優秀なAI) | 100回 | +1% / -1% | +5%(手数料考慮後) |
重要な認識: AIが株価の「方向性」を51~55%の精度で予想できれば、それで十分です。完全な予想は不可能ですし、求めるべきではありません。
【コピペOK】機械学習モデルの構築:LSTM+ニューラルネットワークによる株価予想
📘 外部参考:LSTM(Wikipedia) / Keras LSTM(公式)
では、実際にAIモデルを構築します。LSTMニューラルネットワークを使用して、過去60日間のデータから翌日の価格方向を予想するモデルを実装します。
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import warnings
warnings.filterwarnings('ignore')
# ==============================
# 設定エリア
# ==============================
SYMBOL = "7203.T" # トヨタ自動車
START_DATE = "2018-01-01"
END_DATE = datetime.now().strftime('%Y-%m-%d')
LOOKBACK_PERIOD = 60 # 過去60日のデータを使用
TEST_RATIO = 0.2 # テスト期間の比率
VALIDATION_RATIO = 0.1 # 検証期間の比率
# ==============================
# データ取得
# ==============================
def fetch_stock_data(symbol, start_date, end_date):
"""
yfinanceから株価データを取得
Args:
symbol (str): 銘柄コード
start_date (str): 開始日付
end_date (str): 終了日付
Returns:
pd.DataFrame: OHLCV データ
"""
print(f"[{datetime.now()}] {symbol} のデータを取得中({start_date}~{end_date})...")
try:
ticker = yf.Ticker(symbol)
df = ticker.history(start=start_date, end=end_date)
if df.empty:
raise ValueError("データが取得できませんでした")
print(f"[{datetime.now()}] 取得完了: {len(df)}営業日分")
return df
except Exception as e:
print(f"エラー: {e}")
return None
# ==============================
# 訓練・検証・テストデータの分割
# ==============================
def split_train_val_test(df, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
"""
訓練・検証・テストデータを時系列で分割
Args:
df (pd.DataFrame): 全期間のデータ
train_ratio, val_ratio, test_ratio: 各期間の比率
Returns:
tuple: (df_train, df_val, df_test)
"""
print(f"\n【データセット分割】")
n = len(df)
train_end = int(n * train_ratio)
val_end = train_end + int(n * val_ratio)
df_train = df[:train_end]
df_val = df[train_end:val_end]
df_test = df[val_end:]
print(f"訓練期間: {df_train.index[0].date()} ~ {df_train.index[-1].date()} ({len(df_train)}日)")
print(f"検証期間: {df_val.index[0].date()} ~ {df_val.index[-1].date()} ({len(df_val)}日)")
print(f"テスト期間: {df_test.index[0].date()} ~ {df_test.index[-1].date()} ({len(df_test)}日)")
return df_train, df_val, df_test
# ==============================
# 教師データの作成
# ==============================
def create_sequences(df, lookback=60):
"""
訓練用の時系列シーケンスを作成
Args:
df (pd.DataFrame): 価格データ
lookback (int): 過去何日分のデータを使うか
Returns:
tuple: (X, y) 入力データと目標ラベル
X: (サンプル数, lookback, 特徴数)
y: (サンプル数,) 0=下降, 1=上昇
"""
X, y = [], []
prices = df['Close'].values
for i in range(len(prices) - lookback):
# 過去60日のデータ
X.append(prices[i:i+lookback])
# 翌日の価格が上がったか下がったかを判定
if prices[i+lookback] > prices[i+lookback-1]:
y.append(1) # 上昇
else:
y.append(0) # 下降
return np.array(X), np.array(y)
# ==============================
# データの正規化
# ==============================
def normalize_data(X_train, X_val, X_test):
"""
データを訓練データの統計量を使って正規化
Args:
X_train, X_val, X_test: 各期間のデータ
Returns:
tuple: (X_train_norm, X_val_norm, X_test_norm, scaler)
"""
print(f"\n【データの正規化】")
scaler = MinMaxScaler(feature_range=(0, 1))
# 訓練データで正規化パラメータを学習
X_train_reshaped = X_train.reshape(-1, 1)
scaler.fit(X_train_reshaped)
# すべてのデータを正規化
X_train_norm = scaler.transform(X_train.reshape(-1, 1)).reshape(X_train.shape)
X_val_norm = scaler.transform(X_val.reshape(-1, 1)).reshape(X_val.shape)
X_test_norm = scaler.transform(X_test.reshape(-1, 1)).reshape(X_test.shape)
print(f"正規化完了: データを0~1の範囲に変換")
return X_train_norm, X_val_norm, X_test_norm, scaler
# ==============================
# LSTMモデルの構築
# ==============================
def build_lstm_model(lookback):
"""
LSTMニューラルネットワークモデルを構築
Args:
lookback (int): 入力シーケンス長
Returns:
tf.keras.Model: コンパイル済みモデル
"""
print(f"\n【LSTMモデルの構築】")
model = Sequential([
# LSTM層1: 50ユニット
LSTM(50, return_sequences=True, input_shape=(lookback, 1)),
Dropout(0.2),
# LSTM層2: 50ユニット
LSTM(50, return_sequences=False),
Dropout(0.2),
# 全結合層
Dense(25),
# 出力層(シグモイド活性化で0~1を出力)
Dense(1, activation='sigmoid'),
])
# モデルをコンパイル
model.compile(
optimizer=Adam(learning_rate=0.001),
loss='binary_crossentropy',
metrics=['accuracy']
)
print(f"モデル構成:")
model.summary()
return model
# ==============================
# モデルの訓練
# ==============================
def train_model(model, X_train, y_train, X_val, y_val, epochs=50, batch_size=32):
"""
モデルを訓練
Args:
model: Kerasモデル
X_train, y_train: 訓練データ
X_val, y_val: 検証データ
epochs: エポック数
batch_size: バッチサイズ
Returns:
tf.keras.callbacks.History: 訓練履歴
"""
print(f"\n【モデルの訓練】")
print(f"エポック数: {epochs}, バッチサイズ: {batch_size}")
history = model.fit(
X_train, y_train,
epochs=epochs,
batch_size=batch_size,
validation_data=(X_val, y_val),
verbose=0
)
print(f"訓練完了")
print(f"最終訓練精度: {history.history['accuracy'][-1]*100:.2f}%")
print(f"最終検証精度: {history.history['val_accuracy'][-1]*100:.2f}%")
return history
# ==============================
# モデルの評価
# ==============================
def evaluate_model(model, X_test, y_test):
"""
テストデータでモデルを評価
Args:
model: 訓練済みモデル
X_test, y_test: テストデータ
Returns:
dict: 評価指標
"""
print(f"\n【テストデータでの評価】")
# 予測
y_pred_prob = model.predict(X_test, verbose=0)
y_pred = (y_pred_prob > 0.5).astype(int).flatten()
# 精度を計算
accuracy = accuracy_score(y_test, y_pred)
# 混同行列
cm = confusion_matrix(y_test, y_pred)
tn, fp, fn, tp = cm.ravel()
# 詳細指標
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
print(f"精度(Accuracy): {accuracy*100:.2f}%")
print(f"適合率(Precision): {precision*100:.2f}%")
print(f"再現率(Recall): {recall*100:.2f}%")
print(f"F1スコア: {f1_score:.3f}")
print(f"\n混同行列:")
print(f" 真陰性(TN): {tn} 件")
print(f" 偽陽性(FP): {fp} 件")
print(f" 偽陰性(FN): {fn} 件")
print(f" 真陽性(TP): {tp} 件")
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'confusion_matrix': cm,
'y_pred': y_pred,
'y_pred_prob': y_pred_prob,
}
# ==============================
# 売買シグナルの生成
# ==============================
def generate_trading_signals(model, X_test, scaler, df_test, threshold=0.55):
"""
モデルの予測から売買シグナルを生成
Args:
model: 訓練済みモデル
X_test: テスト用入力データ
scaler: 正規化器
df_test: テスト期間のデータ
threshold: 買いシグナルの判定閾値(0.5より高い=モデルの確度が高い)
Returns:
pd.DataFrame: シグナル情報を含むDataFrame
"""
print(f"\n【売買シグナルの生成】")
# 予測確率を取得
y_pred_prob = model.predict(X_test, verbose=0).flatten()
# シグナル信号を作成
signals = pd.DataFrame(index=df_test.index[60:]) # シーケンス作成のため最初の60日はスキップ
signals['Predicted_Prob'] = y_pred_prob
signals['Signal'] = 0
# 閾値ベースのシグナル
signals['Signal'] = ((y_pred_prob > threshold).astype(int) - (y_pred_prob < (1-threshold)).astype(int))
# 1 = 買いシグナル, 0 = 中立, -1 = 売りシグナル
signals['Actual_Price'] = df_test['Close'].values[60:]
print(f"買いシグナル: {(signals['Signal'] == 1).sum()}回")
print(f"売りシグナル: {(signals['Signal'] == -1).sum()}回")
print(f"中立シグナル: {(signals['Signal'] == 0).sum()}回")
return signals
# ==============================
# メイン処理
# ==============================
if __name__ == "__main__":
# データ取得
df = fetch_stock_data(SYMBOL, START_DATE, END_DATE)
if df is not None:
# 訓練・検証・テストデータを分割
df_train, df_val, df_test = split_train_val_test(df, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2)
# シーケンスデータを作成
X_train, y_train = create_sequences(df_train, LOOKBACK_PERIOD)
X_val, y_val = create_sequences(df_val, LOOKBACK_PERIOD)
X_test, y_test = create_sequences(df_test, LOOKBACK_PERIOD)
print(f"\n【教師データの統計】")
print(f"訓練: {len(X_train)}サンプル(上昇: {y_train.sum()}, 下降: {len(y_train) - y_train.sum()})")
print(f"検証: {len(X_val)}サンプル(上昇: {y_val.sum()}, 下降: {len(y_val) - y_val.sum()})")
print(f"テスト: {len(X_test)}サンプル(上昇: {y_test.sum()}, 下降: {len(y_test) - y_test.sum()})")
# データを正規化
X_train_norm, X_val_norm, X_test_norm, scaler = normalize_data(X_train, X_val, X_test)
# LSTMモデルを構築
model = build_lstm_model(LOOKBACK_PERIOD)
# モデルを訓練
history = train_model(model, X_train_norm, y_train, X_val_norm, y_val, epochs=50, batch_size=32)
# テストデータで評価
eval_result = evaluate_model(model, X_test_norm, y_test)
# 売買シグナルを生成
signals = generate_trading_signals(model, X_test_norm, scaler, df_test, threshold=0.55)
# 結果を表示
print(f"\n【直近10営業日のシグナル】")
print(signals[['Predicted_Prob', 'Signal', 'Actual_Price']].tail(10))
このコードの処理フロー:
split_train_val_test()で時系列に沿ってデータを訓練(60%)、検証(20%)、テスト(20%)に分割create_sequences()で過去60日のデータから翌日の上昇・下降を予想する教師データを作成build_lstm_model()でLSTMニューラルネットワークを構築(2層のLSTM + Dropout)train_model()でモデルを訓練(検証精度を監視)evaluate_model()でテストデータでの精度を評価generate_trading_signals()で売買シグナルを生成
scikit-learnによる従来型機械学習との比較
LSTMは強力ですが、計算量が多く、過学習のリスクもあります。より軽量な従来型機械学習(ランダムフォレスト、勾配ブースティング)との比較も重要です。
【コピペOK】複数モデルの比較評価システム
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import warnings
warnings.filterwarnings('ignore')
# ==============================
# 設定エリア
# ==============================
SYMBOL = "7203.T"
START_DATE = "2020-01-01"
END_DATE = datetime.now().strftime('%Y-%m-%d')
LOOKBACK_PERIOD = 20
# ==============================
# テクニカル特徴量の作成
# ==============================
def create_features(df, lookback=20):
"""
テクニカル指標から機械学習用の特徴量を作成
Args:
df (pd.DataFrame): 価格データ
lookback (int): 過去何日分の指標を使うか
Returns:
tuple: (X, y) 特徴量と目標ラベル
"""
print(f"特徴量を作成中...")
df = df.copy()
# 基本的なテクニカル指標を計算
df['MA_SHORT'] = df['Close'].rolling(window=5).mean()
df['MA_LONG'] = df['Close'].rolling(window=20).mean()
df['RSI'] = calculate_rsi(df['Close'], 14)
df['Volatility'] = df['Close'].pct_change().rolling(window=20).std()
df['Volume_MA'] = df['Volume'].rolling(window=20).mean()
# 日次リターンを計算
df['Daily_Return'] = df['Close'].pct_change()
# 目標変数:翌日の価格が上がったか(1)下がったか(0)
df['Target'] = (df['Daily_Return'].shift(-1) > 0).astype(int)
# 特徴量を選択
feature_columns = ['MA_SHORT', 'MA_LONG', 'RSI', 'Volatility', 'Volume_MA', 'Daily_Return']
# 欠損値を削除
df = df.dropna()
X = df[feature_columns].values
y = df['Target'].values
print(f"作成完了: {X.shape[0]}サンプル × {X.shape[1]}特徴量")
return X, y, df.index
def calculate_rsi(series, period=14):
"""RSIを計算"""
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
# ==============================
# 複数モデルの訓練と評価
# ==============================
class MultiModelEvaluator:
"""複数の機械学習モデルを訓練し、パフォーマンスを比較"""
def __init__(self, X, y):
"""
Args:
X (np.ndarray): 特徴量
y (np.ndarray): 目標変数
"""
self.X = X
self.y = y
self.results = {}
self.models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
}
def train_and_evaluate(self, n_splits=5):
"""
時系列分割で複数モデルを訓練・評価
Args:
n_splits (int): クロスバリデーションの分割数
"""
print(f"\n【複数モデルの訓練・評価】")
print(f"時系列分割: {n_splits}回")
tscv = TimeSeriesSplit(n_splits=n_splits)
for model_name, model in self.models.items():
print(f"\n訓練中: {model_name}...", end=" ")
accuracies = []
precisions = []
recalls = []
f1_scores = []
for train_idx, test_idx in tscv.split(self.X):
X_train, X_test = self.X[train_idx], self.X[test_idx]
y_train, y_test = self.y[train_idx], self.y[test_idx]
# データを標準化
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# モデルを訓練
model.fit(X_train, y_train)
# 予測
y_pred = model.predict(X_test)
# 評価指標を計算
accuracies.append(accuracy_score(y_test, y_pred))
precisions.append(precision_score(y_test, y_pred, zero_division=0))
recalls.append(recall_score(y_test, y_pred, zero_division=0))
f1_scores.append(f1_score(y_test, y_pred, zero_division=0))
# 平均値を計算
self.results[model_name] = {
'accuracy': np.mean(accuracies),
'precision': np.mean(precisions),
'recall': np.mean(recalls),
'f1_score': np.mean(f1_scores),
}
print(f"✅ 完了")
def display_comparison(self):
"""モデル比較結果を表示"""
print(f"\n" + "="*70)
print("機械学習モデル比較")
print("="*70)
comparison_df = pd.DataFrame(self.results).T
comparison_df = comparison_df.sort_values('f1_score', ascending=False)
print(comparison_df.round(4))
print(f"\n【推奨モデル】")
best_model = comparison_df.index[0]
print(f"{best_model}")
print(f"F1スコア: {comparison_df['f1_score'].iloc[0]:.4f}")
# ==============================
# メイン処理
# ==============================
if __name__ == "__main__":
# データ取得
print(f"[{datetime.now()}] {SYMBOL} のデータを取得中...")
ticker = yf.Ticker(SYMBOL)
df = ticker.history(start=START_DATE, end=END_DATE)
df = df.fillna(method='ffill')
# 特徴量を作成
X, y, dates = create_features(df, LOOKBACK_PERIOD)
# 複数モデルを訓練・評価
evaluator = MultiModelEvaluator(X, y)
evaluator.train_and_evaluate(n_splits=5)
# 結果を表示
evaluator.display_comparison()
このコードの処理フロー:
create_features()でテクニカル指標(移動平均線、RSI、ボラティリティ)から特徴量を作成MultiModelEvaluatorで3つの異なるアルゴリズム(ロジスティック回帰、ランダムフォレスト、勾配ブースティング)を比較- 時系列分割によるクロスバリデーションで、過学習を避けながら正当な精度評価
過学習の検出と実務的な活用ガイドライン
バックテスト結果が素晴らしくても、実運用で失敗することの最大の理由は「過学習」です。これを検出する方法を解説します。
📘 外部参考:Backtesting.py(公式ドキュメント) / Backtrader 公式
過学習を検出するチェックリスト
以下の指標が全て満たされることが、実運用推奨の条件です。
| 指標 | 良好な基準 | 注意が必要 | 危険 |
|---|---|---|---|
| テスト精度 | 52~55% | 50~52%, 55~58% | >60%, <50% |
| 訓練精度 – テスト精度 | <2% | 2~5% | >5% |
| F1スコア | >0.45 | 0.40~0.45 | <0.40 |
| 複数時期でのテスト | 全期間で一貫 | 期間により変動 | 期間で大きく異なる |
| 複数銘柄での検証 | 複数銘柄で有効 | 1銘柄のみ有効 | 銘柄により異なる |
よくあるエラーと対処法
モデルの精度がランダム(50%)と変わりません
原因:
- 特徴量に予測力がない
- 訓練期間が短すぎる
- モデルが未訓練状態
対処法:
- テクニカル指標を増やす: RSI、MACD、ボリンジャーバンドなど
- より長期のデータで訓練: 3~5年分のデータを使用
- エポック数やバッチサイズを調整
📘 外部参考:Bollinger Bands 公式 / Wikipedia
メモリ不足エラーが発生します
原因:
- LSTMモデルが大きすぎる
- バッチサイズが大きすぎる
- 訓練データが多すぎる
対処法:
- LSTM層のユニット数を減らす: 50→25に変更
- バッチサイズを減らす: 32→16に変更
- データを減らす: より短い期間で訓練
テストデータで高い精度が出ているのに、実運用で利益が出ません
原因:
- 過学習している
- 手数料とスリッページを考慮していない
- テスト期間の市場環境が特殊だった
対処法:
- 訓練・検証・テスト期間を分割し直す
- シミュレーション時に手数料+0.2%、スリッページ+0.1%を反映
- 異なる時期、異なる銘柄で検証
「FutureWarning」などの警告が大量に出ます
原因:
- Pandasやライブラリのバージョンが古い、または新しい
- 非推奨のメソッドを使用している
対処法:
- ライブラリをアップデート:
pip install --upgrade pandas scikit-learn tensorflow - 警告を無視:
import warnings; warnings.filterwarnings('ignore')
まとめ
本記事では、AIを使った株価予想システムをゼロから自作する方法を、限界と現実的な期待値を交えて解説しました。
要点を整理します。
- 機械学習による株価予想は、「完全な予想」ではなく「方向性の確度51~55%」を目指すべき
- LSTMとscikit-learnの従来型機械学習は、それぞれ異なる利点と欠点がある
- 訓練・検証・テストの3段階分割は、過学習を避けるための必須条件
- テスト精度とシャープレシオ、複数銘柄での検証が、実運用の成功を左右する
- 手数料とスリッページを必ず考慮し、バックテスト時に反映させる
- AIモデルでも完全な予想は不可能。謙虚さを持って運用することが重要
次のステップとしては、本記事のコードで複数銘柄、複数時期でテストを実施し、モデルの汎用性を確認することをお勧めします。その後、デモ口座で3~6ヶ月の前向きテストを実施してから、本運用に移行してください。
AIは強力なツールですが、魔法ではありません。統計的な優位性を持つシステムを謙虚に運用することが、個人投資家の成功の鍵です。
