※本記事のコードや情報は執筆時点の仕様に基づいています。投資は自己責任であり、必ずデモ環境や少額資金でテストした上で運用してください。
アルゴリズム取引における戦略選択は、単なる「どの指標を使うか」という技術的な問題ではなく、「市場環境に対して、どの戦略が最適なのか」という根本的な問いに答える必要があります。多くの個人投資家は、テクニカル指標を組み合わせることで万能な戦略を目指しますが、実際には「トレンドフォロー戦略」と「平均回帰戦略」という2つの対極の思想が存在し、市場環境によってどちらが機能するかは大きく異なるのです。
トレンドフォロー戦略は、「相場は一度トレンドが始まると、その方向に惰性で進み続ける」という仮説に基づいています。一方、平均回帰戦略は、「相場は過度に上昇・下降した後、必ず平均値に戻る」という仮説に基づいています。これら2つの戦略は互いに相容れない前提を持ちながらも、市場環境によって驚くほどの効果差が生まれるのです。
本記事では、これら2つの主要戦略をPythonで実装する具体的な方法を解説し、さらにはそれらを「環境認識ロジック」と組み合わせることで、市場の変化に自動的に対応するハイブリッド戦略の構築方法を提供します。
トレンドフォロー戦略の設計原理
トレンドフォロー戦略は、市場心理学的には「モメンタム」と呼ばれる現象に基づいています。一度上昇トレンドが形成されると、それを「良い兆候」と判断した新規買い手が次々と参入し、その買い圧力がさらに相場を上昇させるという自己増殖的な現象です。
トレンドフォロー戦略が機能する市場環境
トレンドフォロー戦略は、以下の環境で最大の効力を発揮します:
- トレンドが明確に形成されている時期(強いトレンド市場)
- 中央銀行による金融緩和(流動性が豊富で、投機資金が流入)
- 企業の決算期以降(ファンダメンタルズの変化がトレンドを形成)
- 地政学的リスクイベントの後(一方向への相場変動)
トレンドフォロー戦略が機能しない環境
逆に、以下の環境では、トレンドフォロー戦略は損失を膨らませやすくなります:
- 持ち合い相場(レンジ相場)での横ばい局面
- 金融引き締め局面での方向性の不明確さ
- ボラティリティが急激に上昇した直後
【コピペOK】トレンドフォロー戦略の完全実装
【コピペOK】
以下は、移動平均線クロス、MACD、ADXを組み合わせたトレンドフォロー戦略の完全な実装です。単純な買い売りだけでなく、トレンドの強度を数値化し、ポジションサイズの動的調整も含まれています。
# ====== 設定エリア ======
STOCK_CODE = "9984.T"
START_DATE = "2022-01-01"
END_DATE = "2024-12-31"
# トレンドフォロー戦略パラメータ
FAST_MA = 20
SLOW_MA = 50
MACD_FAST = 12
MACD_SLOW = 26
MACD_SIGNAL = 9
ADX_PERIOD = 14
ADX_THRESHOLD = 25 # ADXが25以上でトレンドが強いと判定
MIN_VOLUME_MULTIPLIER = 1.5 # 出来高が平均の1.5倍以上でシグナル確認
INITIAL_CAPITAL = 1000000
POSITION_SIZE_MULTIPLIER = 0.02 # ポートフォリオの2%をリスク額に設定
# ====== ライブラリのインポート ======
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
# ====== テクニカル指標計算モジュール ======
def calculate_moving_averages(df, fast, slow):
"""移動平均線の計算"""
df['MA_fast'] = df['Close'].rolling(window=fast).mean()
df['MA_slow'] = df['Close'].rolling(window=slow).mean()
return df
def calculate_macd(df, fast=12, slow=26, signal=9):
"""MACD(移動平均収束発散)の計算"""
ema_fast = df['Close'].ewm(span=fast, adjust=False).mean()
ema_slow = df['Close'].ewm(span=slow, adjust=False).mean()
df['MACD'] = ema_fast - ema_slow
df['MACD_Signal'] = df['MACD'].ewm(span=signal, adjust=False).mean()
df['MACD_Histogram'] = df['MACD'] - df['MACD_Signal']
return df
def calculate_adx(df, period=14):
"""ADX(Average Directional Index)の計算 - トレンド強度指標"""
# True Range の計算
df['TR'] = np.maximum(
df['High'] - df['Low'],
np.maximum(
abs(df['High'] - df['Close'].shift(1)),
abs(df['Low'] - df['Close'].shift(1))
)
)
# Plus DM, Minus DM の計算
df['DM_Plus'] = np.where(
(df['High'] - df['High'].shift(1)) > (df['Low'].shift(1) - df['Low']),
df['High'] - df['High'].shift(1),
0
)
df['DM_Plus'] = np.where(df['DM_Plus'] < 0, 0, df['DM_Plus'])
df['DM_Minus'] = np.where(
(df['Low'].shift(1) - df['Low']) > (df['High'] - df['High'].shift(1)),
df['Low'].shift(1) - df['Low'],
0
)
df['DM_Minus'] = np.where(df['DM_Minus'] < 0, 0, df['DM_Minus'])
# ATR(Average True Range)の計算
df['ATR'] = df['TR'].rolling(window=period).mean()
# DI(Directional Index)の計算
df['DI_Plus'] = 100 * (df['DM_Plus'].rolling(window=period).mean() / df['ATR'])
df['DI_Minus'] = 100 * (df['DM_Minus'].rolling(window=period).mean() / df['ATR'])
# ADX の計算
df['DI_Diff'] = abs(df['DI_Plus'] - df['DI_Minus'])
df['DI_Sum'] = df['DI_Plus'] + df['DI_Minus']
df['DX'] = 100 * (df['DI_Diff'] / df['DI_Sum'])
df['ADX'] = df['DX'].rolling(window=period).mean()
return df
def calculate_volume_signal(df, period=20):
"""出来高の移動平均を計算し、相対的な出来高を評価"""
df['Volume_MA'] = df['Volume'].rolling(window=period).mean()
df['Volume_Ratio'] = df['Volume'] / df['Volume_MA']
return df
# ====== トレンド強度の定量化 ======
def calculate_trend_strength(df):
"""
複数の指標を組み合わせて、トレンドの強度(0~100)を計算
評価項目:
- MA乖離率: 短期MAが長期MAからどれだけ離れているか(0~30点)
- MACD: MACDヒストグラムの大きさ(0~30点)
- ADX: トレンド強度指標(0~30点)
- 出来高確認: 出来高が平均以上かどうか(0~10点)
"""
latest = df.iloc[-1]
strength = 0
# 項目1:MA乖離率(0~30点)
if latest['MA_slow'] > 0:
ma_divergence = abs(latest['MA_fast'] - latest['MA_slow']) / latest['MA_slow']
ma_score = min(ma_divergence * 100, 30)
strength += ma_score
# 項目2:MACDヒストグラムの大きさ(0~30点)
if pd.notna(latest['MACD_Histogram']):
macd_score = min(abs(latest['MACD_Histogram']), 30)
strength += macd_score
# 項目3:ADX(0~30点)
if pd.notna(latest['ADX']):
adx_score = min(latest['ADX'], 100) / 100 * 30
strength += adx_score
# 項目4:出来高確認(0~10点)
if latest['Volume_Ratio'] > 1.0:
volume_score = min(latest['Volume_Ratio'] * 10, 10)
strength += volume_score
return min(strength, 100)
# ====== シグナル生成エンジン ======
class TrendFollowingStrategyEngine:
"""トレンドフォロー戦略のシグナル生成"""
def __init__(self, fast_ma, slow_ma, adx_threshold):
self.fast_ma = fast_ma
self.slow_ma = slow_ma
self.adx_threshold = adx_threshold
self.last_signal = 0
self.position_holding = False
def generate_signal(self, df):
"""
売買シグナルを生成
Returns:
signal: 1(買い), -1(売り), 0(待機)
confidence: シグナル信頼度(0~100)
reason: シグナル理由の説明
"""
latest = df.iloc[-1]
prev = df.iloc[-2]
signal = 0
confidence = 0
reason = []
# 条件1:MA クロス(トレンド転換の判定)
ma_bullish = (prev['MA_fast'] <= prev['MA_slow'] and
latest['MA_fast'] > latest['MA_slow'])
ma_bearish = (prev['MA_fast'] >= prev['MA_slow'] and
latest['MA_fast'] < latest['MA_slow'])
if ma_bullish:
signal = 1
confidence += 30
reason.append("ゴールデンクロス(買いシグナル)")
elif ma_bearish:
signal = -1
confidence += 30
reason.append("デッドクロス(売りシグナル)")
# 条件2:MACD確認(シグナルの強度を確認)
if signal == 1:
# 買いシグナルの場合
if latest['MACD'] > latest['MACD_Signal']:
confidence += 25
reason.append("MACD > Signal(買い確認)")
else:
confidence -= 10
signal = 0 # 買いシグナル取り消し
reason.append("MACD が Signal を下回る(買い弱気)")
elif signal == -1:
# 売りシグナルの場合
if latest['MACD'] < latest['MACD_Signal']:
confidence += 25
reason.append("MACD < Signal(売り確認)")
else:
confidence -= 10
signal = 0 # 売りシグナル取り消し
reason.append("MACD が Signal を上回る(売り弱気)")
# 条件3:ADX確認(トレンド強度が十分か)
if signal != 0:
if latest['ADX'] > self.adx_threshold:
confidence += 20
reason.append(f"ADX={latest['ADX']:.1f} > {self.adx_threshold}(トレンド強い)")
else:
confidence -= 15
signal = 0
reason.append(f"ADX={latest['ADX']:.1f} < {self.adx_threshold}(トレンド弱い)")
# 条件4:出来高確認(買収には十分な出来高があるか)
if signal != 0:
if latest['Volume_Ratio'] > MIN_VOLUME_MULTIPLIER:
confidence += 15
reason.append(f"出来高が平均の{latest['Volume_Ratio']:.1f}倍")
else:
confidence -= 10
reason.append("出来高が不足している")
# 信頼度の正規化
confidence = max(0, min(confidence, 100))
return signal, confidence, reason
# ====== ポジションサイズ計算 ======
def calculate_position_size(account_value, risk_amount, entry_price, stop_loss_price):
"""
ATR(Average True Range)ベースのポジションサイズ計算
Args:
account_value: 口座資産
risk_amount: リスク許容額
entry_price: エントリー価格
stop_loss_price: ストップロス価格
Returns:
position_size: 買うべき株数
"""
risk_per_share = abs(entry_price - stop_loss_price)
if risk_per_share == 0:
return 0
position_size = int(risk_amount / risk_per_share)
# 資金不足の場合は調整
max_buyable = int(account_value * 0.95 / entry_price)
position_size = min(position_size, max_buyable)
return position_size
# ====== バックテスト実行 ======
def backtest_trend_following(df, strategy_engine, initial_capital):
"""トレンドフォロー戦略のバックテスト実行"""
# 必要な指標計算
df = calculate_moving_averages(df, FAST_MA, SLOW_MA)
df = calculate_macd(df, MACD_FAST, MACD_SLOW, MACD_SIGNAL)
df = calculate_adx(df, ADX_PERIOD)
df = calculate_volume_signal(df, period=20)
df_clean = df.dropna()
# バックテスト変数
position = 0
entry_price = 0
capital = initial_capital
portfolio_values = []
trades = []
for idx in range(1, len(df_clean)):
current_data = df_clean.iloc[:idx+1]
latest_price = df_clean.iloc[idx]['Close']
# シグナル生成
signal, confidence, reason = strategy_engine.generate_signal(current_data)
# エントリー条件
if signal == 1 and position == 0:
# 買いシグナルが出た場合、10日分のATRを計算してストップロスを設定
atr = df_clean.iloc[idx]['ATR']
stop_loss_price = latest_price - (atr * 2)
risk_amount = capital * POSITION_SIZE_MULTIPLIER
position = calculate_position_size(capital, risk_amount, latest_price, stop_loss_price)
if position > 0:
entry_price = latest_price
trades.append({
'date': df_clean.index[idx],
'type': 'BUY',
'price': latest_price,
'quantity': position,
'confidence': confidence,
'reason': ' | '.join(reason)
})
# ストップロス判定
elif position > 0 and idx > 0:
stop_loss_price = entry_price - (df_clean.iloc[idx]['ATR'] * 2)
if latest_price < stop_loss_price:
# ストップロス発動
capital = position * latest_price
trades.append({
'date': df_clean.index[idx],
'type': 'SELL (STOP LOSS)',
'price': latest_price,
'quantity': position,
'pnl': (latest_price - entry_price) * position
})
position = 0
elif signal == -1:
# 売りシグナルが出た場合
capital = position * latest_price
pnl = (latest_price - entry_price) * position
trades.append({
'date': df_clean.index[idx],
'type': 'SELL',
'price': latest_price,
'quantity': position,
'pnl': pnl,
'reason': ' | '.join(reason)
})
position = 0
# ポートフォリオ価値の計算
portfolio_value = capital + (position * latest_price)
portfolio_values.append(portfolio_value)
return {
'trades': trades,
'final_capital': capital,
'final_value': portfolio_values[-1] if portfolio_values else initial_capital,
'total_return': (portfolio_values[-1] - initial_capital) / initial_capital * 100 if portfolio_values else 0,
'portfolio_history': portfolio_values
}
# ====== メイン実行 ======
def main():
print("=" * 80)
print("【トレンドフォロー戦略 - バックテスト実行】")
print("=" * 80)
print(f"\n対象銘柄: {STOCK_CODE}")
print(f"期間: {START_DATE} ~ {END_DATE}")
print(f"初期資本: ¥{INITIAL_CAPITAL:,}\n")
# データ取得
df = yf.download(STOCK_CODE, start=START_DATE, end=END_DATE, progress=False)
# 戦略エンジン初期化
strategy_engine = TrendFollowingStrategyEngine(FAST_MA, SLOW_MA, ADX_THRESHOLD)
# バックテスト実行
results = backtest_trend_following(df, strategy_engine, INITIAL_CAPITAL)
# 結果表示
print(f"\n【バックテスト結果】\n")
print(f"最終ポートフォリオ価値: ¥{results['final_value']:,.0f}")
print(f"総リターン: {results['total_return']:.2f}%")
print(f"総売買回数: {len(results['trades'])}")
# 取引詳細表示
if results['trades']:
print(f"\n【主要な取引】\n")
for i, trade in enumerate(results['trades'][:10]):
print(f"{i+1}. {trade['date'].strftime('%Y-%m-%d')} {trade['type']}")
print(f" 価格: ¥{trade['price']:.2f}, 数量: {trade['quantity']}株")
if 'pnl' in trade:
print(f" 損益: ¥{trade['pnl']:,.0f}")
if 'reason' in trade:
print(f" 理由: {trade['reason']}")
print()
if __name__ == "__main__":
main()
コード解説:
- calculate_adx: トレンド強度を測定するADX(Average Directional Index)を計算。トレンド相場か判定するために重要
- calculate_trend_strength: 複数指標の信頼度を組み合わせて、トレンド強度を0~100の値で定量化
- TrendFollowingStrategyEngine.generate_signal: MA、MACD、ADX、出来高の4つの条件を組み合わせた多層的なシグナル生成
- calculate_position_size: ATRとリスク許容額に基づいて、動的にポジションサイズを計算し、過度なリスクを避ける
- backtest_trend_following: 実際の株価推移に対してバックテストを実行し、成績を評価
平均回帰戦略の設計原理
平均回帰戦略は、トレンドフォロー戦略と全く逆の哲学に基づいています。「相場は一時的に過度に上昇・下降するが、最終的には平均値に回帰する」という前提です。
平均回帰戦略が機能する環境
- 持ち合い相場(レンジ相場)での往来相場
- ボラティリティが上昇した直後(反動買い・売り狙い)
- 金融引き締め局面での方向性の不明確さ
- 技術的サポート・レジスタンスが機能している局面
平均回帰戦略が機能しない環境
- 明確なトレンドが形成されている時期(下降トレンド中の買いは損失を拡大)
- 構造的な相場転換期(新しい平均値への段階的なシフト)
【コピペOK】平均回帰戦略の完全実装
【コピペOK】
以下は、ボリンジャーバンド、RSI、統計的逆張りを組み合わせた平均回帰戦略です。
# ====== 平均回帰戦略パラメータ ======
BB_PERIOD = 20
BB_STD_DEV = 2
RSI_PERIOD = 14
RSI_OVERBOUGHT = 70
RSI_OVERSOLD = 30
PERCENTILE_PERIOD = 50
PERCENTILE_THRESHOLD = 10 # 下位10パーセンタイル以下で買い
MEAN_REVERSION_INITIAL_CAPITAL = 1000000
MEAN_REVERSION_RISK_PERCENT = 0.02
# ====== 平均回帰戦略エンジン ======
class MeanReversionStrategyEngine:
"""平均回帰戦略のシグナル生成"""
def __init__(self):
pass
def calculate_bollinger_bands(self, df, period=20, num_std=2):
"""ボリンジャーバンドの計算"""
df['BB_MA'] = df['Close'].rolling(window=period).mean()
df['BB_STD'] = df['Close'].rolling(window=period).std()
df['BB_Upper'] = df['BB_MA'] + (num_std * df['BB_STD'])
df['BB_Lower'] = df['BB_MA'] - (num_std * df['BB_STD'])
df['BB_Position'] = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower'])
return df
def calculate_rsi(self, df, period=14):
"""RSI(相対力指数)の計算"""
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
return df
def calculate_price_percentile(self, df, period=50):
"""過去N日間の価格パーセンタイル(現在の価格が過去比較でどの位置か)"""
df['Price_Percentile'] = df['Close'].rolling(window=period).apply(
lambda x: (x.iloc[-1] - x.min()) / (x.max() - x.min()) * 100
)
return df
def generate_signal(self, df):
"""
平均回帰シグナルの生成
買いシグナル:価格が下降し過ぎた場合
売りシグナル:価格が上昇し過ぎた場合
"""
latest = df.iloc[-1]
prev = df.iloc[-2]
signal = 0
confidence = 0
reason = []
# 条件1:ボリンジャーバンド(価格が平均から乖離)
bb_lower_breach = (prev['Close'] >= prev['BB_Lower'] and
latest['Close'] < latest['BB_Lower'])
bb_upper_breach = (prev['Close'] <= prev['BB_Upper'] and
latest['Close'] > latest['BB_Upper'])
if bb_lower_breach:
# 下部バンドを下破した → 売られ過ぎ → 買い
signal = 1
confidence += 25
reason.append("下部バンドを下破(売られ過ぎ)")
elif bb_upper_breach:
# 上部バンドを上破した → 買われ過ぎ → 売り
signal = -1
confidence += 25
reason.append("上部バンドを上破(買われ過ぎ)")
# 条件2:RSI確認
if signal == 1 and latest['RSI'] < RSI_OVERSOLD:
confidence += 30
reason.append(f"RSI={latest['RSI']:.1f} < {RSI_OVERSOLD}(極度の売り過ぎ)")
elif signal == 1 and latest['RSI'] > RSI_OVERSOLD:
confidence -= 15
signal = 0
reason.append(f"RSI={latest['RSI']:.1f} > {RSI_OVERSOLD}(買い過ぎではない)")
if signal == -1 and latest['RSI'] > RSI_OVERBOUGHT:
confidence += 30
reason.append(f"RSI={latest['RSI']:.1f} > {RSI_OVERBOUGHT}(極度の買い過ぎ)")
elif signal == -1 and latest['RSI'] < RSI_OVERBOUGHT:
confidence -= 15
signal = 0
reason.append(f"RSI={latest['RSI']:.1f} < {RSI_OVERBOUGHT}(売り過ぎではない)")
# 条件3:価格パーセンタイル(統計的判定)
if signal == 1 and latest['Price_Percentile'] < PERCENTILE_THRESHOLD:
confidence += 20
reason.append(f"価格が過去50日比で下位{latest['Price_Percentile']:.0f}パーセンタイル")
elif signal == 1:
confidence -= 10
reason.append("価格が統計的には高すぎる水準")
if signal == -1 and latest['Price_Percentile'] > (100 - PERCENTILE_THRESHOLD):
confidence += 20
reason.append(f"価格が過去50日比で上位{100-latest['Price_Percentile']:.0f}パーセンタイル")
elif signal == -1:
confidence -= 10
reason.append("価格が統計的には低すぎる水準")
# 条件4:ボリンジャーバンド位置(平均値への回帰の余地)
if signal == 1:
if latest['BB_Position'] < 0.3: # 下部の30%以内
confidence += 15
reason.append("下部バンドに接近(回帰の余地大)")
if signal == -1:
if latest['BB_Position'] > 0.7: # 上部の30%以内
confidence += 15
reason.append("上部バンドに接近(回帰の余地大)")
# 信頼度の正規化
confidence = max(0, min(confidence, 100))
return signal, confidence, reason
# ====== 平均回帰戦略バックテスト ======
def backtest_mean_reversion(df, strategy_engine, initial_capital):
"""平均回帰戦略のバックテスト"""
# 指標計算
df = strategy_engine.calculate_bollinger_bands(df, BB_PERIOD, BB_STD_DEV)
df = strategy_engine.calculate_rsi(df, RSI_PERIOD)
df = strategy_engine.calculate_price_percentile(df, PERCENTILE_PERIOD)
df_clean = df.dropna()
# バックテスト変数
position = 0
entry_price = 0
capital = initial_capital
portfolio_values = []
trades = []
for idx in range(1, len(df_clean)):
current_data = df_clean.iloc[:idx+1]
latest_price = df_clean.iloc[idx]['Close']
latest_data = df_clean.iloc[idx]
# シグナル生成
signal, confidence, reason = strategy_engine.generate_signal(current_data)
# エントリー条件(信頼度60%以上)
if signal == 1 and position == 0 and confidence >= 60:
# 平均回帰買い:前のキャンドルの低値を下回ったら損切り
stop_loss_price = df_clean.iloc[idx-1]['Low'] * 0.99
risk_amount = capital * MEAN_REVERSION_RISK_PERCENT
position = int(risk_amount / (latest_price - stop_loss_price))
if position > 0:
entry_price = latest_price
trades.append({
'date': df_clean.index[idx],
'type': 'BUY (平均回帰)',
'price': latest_price,
'quantity': position,
'confidence': confidence,
'reason': ' | '.join(reason)
})
elif signal == -1 and position == 0 and confidence >= 60:
# 平均回帰売り
stop_loss_price = df_clean.iloc[idx-1]['High'] * 1.01
risk_amount = capital * MEAN_REVERSION_RISK_PERCENT
position = -int(risk_amount / (stop_loss_price - latest_price))
if position < 0:
entry_price = latest_price
trades.append({
'date': df_clean.index[idx],
'type': 'SELL (平均回帰)',
'price': latest_price,
'quantity': abs(position),
'confidence': confidence,
'reason': ' | '.join(reason)
})
# 利益確定・損切り判定
if position > 0:
# 買いポジション保有中
target_price = latest_data['BB_MA'] # 移動平均線を目指す
if latest_price >= target_price:
# 目標価格到達 → 利益確定
capital = position * latest_price
trades.append({
'date': df_clean.index[idx],
'type': 'SELL (利益確定)',
'price': latest_price,
'quantity': position,
'pnl': (latest_price - entry_price) * position
})
position = 0
elif position < 0:
# 売りポジション保有中
target_price = latest_data['BB_MA']
if latest_price <= target_price:
# 目標価格到達 → 利益確定
capital = abs(position) * latest_price
pnl = (entry_price - latest_price) * abs(position)
trades.append({
'date': df_clean.index[idx],
'type': 'BUY (利益確定)',
'price': latest_price,
'quantity': abs(position),
'pnl': pnl
})
position = 0
# ポートフォリオ価値の計算
portfolio_value = capital + (position * latest_price)
portfolio_values.append(portfolio_value)
return {
'trades': trades,
'final_capital': capital,
'final_value': portfolio_values[-1] if portfolio_values else initial_capital,
'total_return': (portfolio_values[-1] - initial_capital) / initial_capital * 100 if portfolio_values else 0,
'portfolio_history': portfolio_values
}
# ====== メイン実行 ======
def main_mean_reversion():
print("=" * 80)
print("【平均回帰戦略 - バックテスト実行】")
print("=" * 80)
print(f"\n対象銘柄: {STOCK_CODE}")
print(f"期間: {START_DATE} ~ {END_DATE}")
print(f"初期資本: ¥{MEAN_REVERSION_INITIAL_CAPITAL:,}\n")
# データ取得
df = yf.download(STOCK_CODE, start=START_DATE, end=END_DATE, progress=False)
# 戦略エンジン初期化
strategy_engine = MeanReversionStrategyEngine()
# バックテスト実行
results = backtest_mean_reversion(df, strategy_engine, MEAN_REVERSION_INITIAL_CAPITAL)
# 結果表示
print(f"\n【バックテスト結果】\n")
print(f"最終ポートフォリオ価値: ¥{results['final_value']:,.0f}")
print(f"総リターン: {results['total_return']:.2f}%")
print(f"総売買回数: {len(results['trades'])}")
if results['trades']:
print(f"\n【主要な取引】\n")
for i, trade in enumerate(results['trades'][:10]):
print(f"{i+1}. {trade['date'].strftime('%Y-%m-%d')} {trade['type']}")
print(f" 価格: ¥{trade['price']:.2f}, 数量: {trade['quantity']}株")
if 'reason' in trade:
print(f" 理由: {trade['reason']}")
print()
if __name__ == "__main__":
main_mean_reversion()
コード解説:
- calculate_bollinger_bands: ボリンジャーバンドを計算し、価格が平均からどれだけ乖離しているかを可視化
- calculate_price_percentile: 過去50日間の価格レンジ内で、現在の価格がどのパーセンタイルに位置するかを計算
- generate_signal: ボリンジャーバンド、RSI、パーセンタイルの3つの条件で、平均回帰の買い売りシグナルを生成
- 目標価格の設定: 利益確定目標を移動平均線に設定し、平均値への回帰を狙う
【コピペOK】市場環境に応じたハイブリッド戦略
【コピペOK】
トレンド市場なのか、持ち合い相場なのかを自動判定して、戦略を切り替えるハイブリッドシステムです。
# ====== ハイブリッド戦略設定 ======
HYBRID_INITIAL_CAPITAL = 1000000
TREND_DETECTION_WINDOW = 30 # トレンド検出期間(日)
# ====== 市場環境判定エンジン ======
class MarketEnvironmentDetector:
"""相場環境(トレンド vs レンジ)を自動判定"""
@staticmethod
def detect_environment(df, adx_threshold=25, atr_period=14):
"""
市場環境を判定
Returns:
environment: 'STRONG_TREND', 'WEAK_TREND', 'RANGING'
confidence: 判定信頼度(0~100)
"""
latest = df.iloc[-1]
# ADXが25以上 → トレンド相場
if latest['ADX'] > adx_threshold:
# さらに、+DIが-DIより大きければ上昇トレンド、逆なら下降トレンド
if latest['DI_Plus'] > latest['DI_Minus']:
return 'UPTREND', min(latest['ADX'] / 50 * 100, 100)
else:
return 'DOWNTREND', min(latest['ADX'] / 50 * 100, 100)
else:
# ADX < 25 → レンジ相場
return 'RANGING', (25 - latest['ADX']) / 25 * 50 + 50
@staticmethod
def calculate_historic_volatility(df, period=20):
"""ボラティリティの計算"""
returns = df['Close'].pct_change()
volatility = returns.rolling(window=period).std() * np.sqrt(252) # 年率化
return volatility
# ====== ハイブリッド戦略エンジン ======
class HybridStrategyEngine:
"""市場環境に応じた戦略の自動切り替え"""
def __init__(self):
self.trend_engine = TrendFollowingStrategyEngine(FAST_MA, SLOW_MA, ADX_THRESHOLD)
self.reversion_engine = MeanReversionStrategyEngine()
self.environment_detector = MarketEnvironmentDetector()
def generate_hybrid_signal(self, df):
"""
市場環境を判定してから、適切な戦略でシグナル生成
"""
# 市場環境判定
environment, env_confidence = self.environment_detector.detect_environment(df)
if environment in ['UPTREND', 'DOWNTREND']:
# トレンド相場 → トレンドフォロー戦略を使用
signal, confidence, reason = self.trend_engine.generate_signal(df)
reason.insert(0, f"[{environment}] トレンド相場 - トレンドフォロー戦略")
else:
# レンジ相場 → 平均回帰戦略を使用
signal, confidence, reason = self.reversion_engine.generate_signal(df)
reason.insert(0, f"[RANGING] レンジ相場 - 平均回帰戦略")
# 環境判定の信頼度も信頼度スコアに反映
confidence = confidence * 0.8 + env_confidence * 0.2
return signal, confidence, reason, environment
def backtest_hybrid(self, df, initial_capital):
"""ハイブリッド戦略のバックテスト"""
# 事前に全ての指標を計算
df = calculate_moving_averages(df, FAST_MA, SLOW_MA)
df = calculate_macd(df, MACD_FAST, MACD_SLOW, MACD_SIGNAL)
df = calculate_adx(df, ADX_PERIOD)
df = calculate_volume_signal(df, period=20)
df = self.reversion_engine.calculate_bollinger_bands(df, BB_PERIOD, BB_STD_DEV)
df = self.reversion_engine.calculate_rsi(df, RSI_PERIOD)
df = self.reversion_engine.calculate_price_percentile(df, PERCENTILE_PERIOD)
df_clean = df.dropna()
# バックテスト変数
position = 0
entry_price = 0
capital = initial_capital
portfolio_values = []
trades = []
environments = []
for idx in range(1, len(df_clean)):
current_data = df_clean.iloc[:idx+1]
latest_price = df_clean.iloc[idx]['Close']
# ハイブリッドシグナル生成
signal, confidence, reason, environment = self.generate_hybrid_signal(current_data)
environments.append((df_clean.index[idx], environment))
# シグナル実行(トレンド市場と レンジ市場で異なるロジック)
if environment in ['UPTREND', 'DOWNTREND'] and signal == 1 and position == 0 and confidence >= 65:
# トレンドフォロー買い
stop_loss_price = latest_price - (df_clean.iloc[idx]['ATR'] * 2)
risk_amount = capital * POSITION_SIZE_MULTIPLIER
position = int(risk_amount / (latest_price - stop_loss_price))
if position > 0:
entry_price = latest_price
trades.append({
'date': df_clean.index[idx],
'strategy': 'TREND_FOLLOWING',
'type': 'BUY',
'price': latest_price,
'quantity': position,
'confidence': confidence
})
elif environment == 'RANGING' and signal == 1 and position == 0 and confidence >= 60:
# 平均回帰買い
stop_loss_price = df_clean.iloc[idx-1]['Low'] * 0.99
risk_amount = capital * MEAN_REVERSION_RISK_PERCENT
position = int(risk_amount / (latest_price - stop_loss_price))
if position > 0:
entry_price = latest_price
trades.append({
'date': df_clean.index[idx],
'strategy': 'MEAN_REVERSION',
'type': 'BUY',
'price': latest_price,
'quantity': position,
'confidence': confidence
})
# 売却判定(簡略化)
elif position > 0 and signal == -1:
capital = position * latest_price
trades.append({
'date': df_clean.index[idx],
'type': 'SELL',
'price': latest_price,
'quantity': position,
'pnl': (latest_price - entry_price) * position
})
position = 0
# ポートフォリオ価値
portfolio_value = capital + (position * latest_price)
portfolio_values.append(portfolio_value)
return {
'trades': trades,
'environments': environments,
'final_value': portfolio_values[-1] if portfolio_values else initial_capital,
'total_return': (portfolio_values[-1] - initial_capital) / initial_capital * 100,
'portfolio_history': portfolio_values
}
# ====== メイン実行 ======
def main_hybrid():
print("=" * 80)
print("【ハイブリッド戦略 - 市場環境自動判定システム】")
print("=" * 80)
print(f"\n対象銘柄: {STOCK_CODE}")
print(f"初期資本: ¥{HYBRID_INITIAL_CAPITAL:,}\n")
df = yf.download(STOCK_CODE, start=START_DATE, end=END_DATE, progress=False)
hybrid_engine = HybridStrategyEngine()
results = hybrid_engine.backtest_hybrid(df, HYBRID_INITIAL_CAPITAL)
print(f"【バックテスト結果】\n")
print(f"最終ポートフォリオ価値: ¥{results['final_value']:,.0f}")
print(f"総リターン: {results['total_return']:.2f}%")
print(f"総売買数: {len(results['trades'])}")
# 環境別の取引数を集計
trend_trades = sum(1 for t in results['trades'] if t.get('strategy') == 'TREND_FOLLOWING')
reversion_trades = sum(1 for t in results['trades'] if t.get('strategy') == 'MEAN_REVERSION')
print(f"\nトレンドフォロー取引: {trend_trades}件")
print(f"平均回帰取引: {reversion_trades}件")
if __name__ == "__main__":
main_hybrid()
コード解説:
- MarketEnvironmentDetector: ADXとDI(Directional Indicator)を使用して、トレンド相場かレンジ相場かを自動判定
- HybridStrategyEngine: 判定結果に応じて、トレンドフォロー戦略または平均回帰戦略を動的に切り替え
- backtest_hybrid: 各時点での市場環境と使用した戦略を記録し、後から検証可能に設計
よくあるエラーと対処法
エラー1:ADXが計算できない(NaNが大量に出る)
症状: df['ADX'] がすべてNaNになっている
原因: True Range(TR)の計算で、前日の終値が不足していることがある。
対処法:
# データに十分なバッファを持たせる
df = df.iloc[ADX_PERIOD:] # 最初の数行を削除
エラー2:バックテスト結果が異常に良い(オーバーフィッティング)
症状: テスト期間では年間利益50%以上が出ているが、実運用では-5%
原因: パラメータをテスト期間に過度に最適化している。
対処法:
- 複数の異なる期間でテストを実施
- パラメータをより広めに設定(汎用性を重視)
- ウォークフォワード分析を実施
def walkforward_analysis(df, strategy_engine, period_length=252):
"""ウォークフォワード分析(複数期間での検証)"""
results = []
for i in range(period_length, len(df), period_length):
test_start = i - period_length
test_end = i
test_df = df.iloc[test_start:test_end]
result = backtest_trend_following(test_df, strategy_engine, INITIAL_CAPITAL)
results.append(result['total_return'])
avg_return = np.mean(results)
std_return = np.std(results)
print(f"平均リターン: {avg_return:.2f}% ± {std_return:.2f}%")
エラー3:ポジションサイズが0になる
症状: シグナルが出ているのに、ポジションが全く建たない
原因: リスク許容額がストップロス幅に対して小さすぎる。
対処法:
- リスク許容額を増やす
- ストップロスの幅を広げる
- 最小ポジションサイズの制限を設ける
エラー4:戦略の切り替えが頻繁で、取引が乱発する
症状: トレンド/レンジの判定が毎日変わり、無駄な取引が増える
原因: 環境判定の閾値(ADX=25)が低すぎる。
対処法:
ADX_THRESHOLD = 30 # 25 → 30 に引き上げ(より強いトレンド確認)
ADX_HYSTERESIS = 5 # ヒステリシス(一度トレンド判定したら、ADXが20以下になるまで維持)
まとめ
本記事では、トレンドフォロー戦略と平均回帰戦略という、株式アルゴリズム取引における2つの対極の投資戦略をPythonで実装する具体的な方法を解説しました。
要点を整理します。
- トレンドフォロー戦略は、トレンド相場で最大の効力を発揮し、MA、MACD、ADXの複合指標で実装される
- 平均回帰戦略は、レンジ相場(持ち合い)で機能し、ボリンジャーバンド、RSI、統計的パーセンタイルで実装される
- 市場環境を自動判定するエンジンを構築することで、トレンド相場とレンジ相場に応じた戦略の自動切り替えが可能
- ポジションサイズはATRやリスク許容額に基づいて動的に計算し、過度なレバレッジを避ける
- 複数の指標による「多層的なシグナル確認」により、誤シグナルを大幅に削減できる
- ウォークフォワード分析や複数期間でのテストを実施することで、パラメータのオーバーフィッティングを防止
- バックテスト結果と実運用結果の乖離を最小化するために、スリップ、手数料、税金を事前に組み込む
個人投資家がアルゴリズム取引で生き残るために最も重要なのは、「単一の戦略に固執しない柔軟性」です。市場環境は常に変化し、同じ戦略が永遠に機能し続けることはあり得ません。本記事で学んだハイブリッド戦略のように、複数の戦略を組み合わせ、自動的に環境に適応するシステムを構築することが、長期的な競争力につながるのです。次のステップとして、自分の運用スタイルに合わせてパラメータをカスタマイズし、デモ環境で十分にテストした後、実運用を開始することを強くお勧めします。

