【逆張り】MACDのダイバージェンスをPythonで自動検知するロジック

Python実装・コード

※本記事のコードや情報は執筆時点の仕様に基づいています。投資は自己責任であり、必ずデモ環境や少額資金でテストした上で運用してください。

MACD(Moving Average Convergence Divergence:移動平均収束拡散法)を使ったテクニカル分析に取り組み、次のステップとしてダイバージェンス(Divergence:逆行現象)の検知に挑戦しようとしている方は多いはずです。

ダイバージェンスは、価格とオシレーターの方向が食い違う現象であり、トレンド転換の予兆として高い有用性を持ちます。裁量トレードの世界では古くから重視されてきたシグナルです。

しかし、チャートを目視で確認してダイバージェンスを判断するのは、時間がかかる上に主観が入りやすいという問題があります。「あれはダイバージェンスだったのか」と後から悩むケースも少なくありません。

原因は、「直近の高値・安値をどう定義するか」「MACDの極値をどう検出するか」というロジック面の曖昧さにあります。定義が曖昧なまま目視に頼ると、検知の再現性がなくなります。

本記事では、Pythonの極値検出アルゴリズムを使い、MACDのダイバージェンスを自動検知するロジックを実装します。価格の高値・安値とMACDヒストグラムの極値を数値的に比較し、強気・弱気のダイバージェンスを判定するコードを提供します。

コードはすべてコピペで動作します。銘柄コードやパラメータを設定エリアで変更するだけで、任意の銘柄に適用できる設計です。

MACDダイバージェンスの理論

ダイバージェンスの定義と2つの種類

ダイバージェンスとは、価格の動きとテクニカル指標の動きが逆方向に進む現象です。MACDにおけるダイバージェンスは以下の2種類に分類されます。

* 弱気ダイバージェンス(Bearish Divergence):価格が高値を切り上げているのに、MACDが高値を切り下げている状態。上昇トレンドの勢いが弱まっていることを示し、下落転換の予兆となる

* 強気ダイバージェンス(Bullish Divergence):価格が安値を切り下げているのに、MACDが安値を切り上げている状態。下落トレンドの勢いが弱まっていることを示し、上昇転換の予兆となる

ダイバージェンスは「トレンドの終わり」を示唆するものであり、「即座に反転する」ことを保証するものではありません。過信してはいけません。

極値検出がダイバージェンス判定の鍵

自動検知のポイントは、価格とMACDそれぞれの局所的な高値(ローカルマキシマム)と安値(ローカルミニマム)を正確に抽出することです。

本記事ではscipy.signal.argrelextremaを使用します。この関数は、前後order個の点と比較して極値を判定します。orderの値が小さいと細かなノイズまで拾い、大きいと重要な転換点を見逃します。日足データではorder=515が実用的な範囲です。

【コピペOK】MACDダイバージェンス自動検知コード

まず必要なライブラリをインストールしてください。


pip install yfinance pandas numpy matplotlib scipy japanize-matplotlib

以下がメインの検知コードです。


from typing import List, Tuple

import japanize_matplotlib  # noqa: F401
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import yfinance as yf
from scipy.signal import argrelextrema

# ==============================
# 設定エリア
# ==============================
TICKER: str = "7203.T"              # 分析対象の銘柄コード
START_DATE: str = "2023-01-01"      # データ取得開始日
END_DATE: str = "2025-01-01"        # データ取得終了日
MACD_FAST: int = 12                 # MACD短期EMA期間
MACD_SLOW: int = 26                 # MACD長期EMA期間
MACD_SIGNAL: int = 9                # シグナル線期間
EXTREMA_ORDER: int = 10             # 極値検出の前後比較幅(営業日)
LOOKBACK_BARS: int = 80             # ダイバージェンス比較の最大間隔(営業日)

# ==============================
# データ取得関数
# ==============================
def fetch_ohlc(ticker: str, start: str, end: str) -> pd.DataFrame:
    '\"yfinanceからOHLCデータを取得する'\"
    df: pd.DataFrame = yf.download(
        ticker, start=start, end=end, auto_adjust=True, progress=False
    )
    if df.empty:
        raise ValueError(f"データ取得に失敗しました: {ticker}")
    df.columns = [c[0] if isinstance(c, tuple) else c for c in df.columns]
    return df


# ==============================
# MACD算出関数
# ==============================
def calc_macd(
    close: pd.Series, fast: int, slow: int, signal: int
) -> Tuple[pd.Series, pd.Series, pd.Series]:
    '\"MACD線・シグナル線・ヒストグラムを算出する'\"
    ema_fast: pd.Series = close.ewm(span=fast, adjust=False).mean()
    ema_slow: pd.Series = close.ewm(span=slow, adjust=False).mean()
    macd_line: pd.Series = ema_fast - ema_slow
    signal_line: pd.Series = macd_line.ewm(span=signal, adjust=False).mean()
    histogram: pd.Series = macd_line - signal_line
    return macd_line, signal_line, histogram


# ==============================
# 極値検出関数
# ==============================
def find_local_maxima(series: pd.Series, order: int) -> np.ndarray:
    '\"ローカルマキシマムのインデックスを返す'\"
    values: np.ndarray = series.values
    indices: Tuple[np.ndarray, ...] = argrelextrema(values, np.greater, order=order)
    return indices[0]


def find_local_minima(series: pd.Series, order: int) -> np.ndarray:
    '\"ローカルミニマムのインデックスを返す'\"
    values: np.ndarray = series.values
    indices: Tuple[np.ndarray, ...] = argrelextrema(values, np.less, order=order)
    return indices[0]


# ==============================
# ダイバージェンス検知関数
# ==============================
def detect_bearish_divergence(
    close: pd.Series,
    macd_line: pd.Series,
    order: int,
    lookback: int,
) -> List[dict]:
    '\"弱気ダイバージェンスを検知する'\"
    price_peaks: np.ndarray = find_local_maxima(close, order)
    macd_peaks: np.ndarray = find_local_maxima(macd_line, order)
    signals: List[dict] = []

    for i in range(1, len(price_peaks)):
        curr_idx: int = price_peaks[i]
        prev_idx: int = price_peaks[i - 1]

        if curr_idx - prev_idx > lookback:
            continue

        price_higher: bool = close.iloc[curr_idx] > close.iloc[prev_idx]

        macd_at_curr: float = _nearest_extrema_value(macd_peaks, curr_idx, macd_line, order)
        macd_at_prev: float = _nearest_extrema_value(macd_peaks, prev_idx, macd_line, order)

        if macd_at_curr is None or macd_at_prev is None:
            continue

        macd_lower: bool = macd_at_curr < macd_at_prev

        if price_higher and macd_lower:
            signals.append({
                "type": "bearish",
                "date": close.index[curr_idx],
                "price_prev_idx": prev_idx,
                "price_curr_idx": curr_idx,
                "price_prev": close.iloc[prev_idx],
                "price_curr": close.iloc[curr_idx],
                "macd_prev": macd_at_prev,
                "macd_curr": macd_at_curr,
            })

    return signals


def detect_bullish_divergence(
    close: pd.Series,
    macd_line: pd.Series,
    order: int,
    lookback: int,
) -> List[dict]:
    '\"強気ダイバージェンスを検知する'\"
    price_troughs: np.ndarray = find_local_minima(close, order)
    macd_troughs: np.ndarray = find_local_minima(macd_line, order)
    signals: List[dict] = []

    for i in range(1, len(price_troughs)):
        curr_idx: int = price_troughs[i]
        prev_idx: int = price_troughs[i - 1]

        if curr_idx - prev_idx > lookback:
            continue

        price_lower: bool = close.iloc[curr_idx] < close.iloc[prev_idx]

        macd_at_curr: float = _nearest_extrema_value(macd_troughs, curr_idx, macd_line, order)
        macd_at_prev: float = _nearest_extrema_value(macd_troughs, prev_idx, macd_line, order)

        if macd_at_curr is None or macd_at_prev is None:
            continue

        macd_higher: bool = macd_at_curr > macd_at_prev

        if price_lower and macd_higher:
            signals.append({
                "type": "bullish",
                "date": close.index[curr_idx],
                "price_prev_idx": prev_idx,
                "price_curr_idx": curr_idx,
                "price_prev": close.iloc[prev_idx],
                "price_curr": close.iloc[curr_idx],
                "macd_prev": macd_at_prev,
                "macd_curr": macd_at_curr,
            })

    return signals


def _nearest_extrema_value(
    extrema_indices: np.ndarray,
    target_idx: int,
    series: pd.Series,
    tolerance: int,
) -> float:
    '\"対象インデックスに最も近い極値の値を返す'\"
    if len(extrema_indices) == 0:
        return None
    distances: np.ndarray = np.abs(extrema_indices - target_idx)
    nearest_pos: int = int(np.argmin(distances))
    if distances[nearest_pos] > tolerance:
        return None
    return float(series.iloc[extrema_indices[nearest_pos]])


# ==============================
# 可視化関数
# ==============================
def plot_divergence(
    df: pd.DataFrame,
    macd_line: pd.Series,
    signal_line: pd.Series,
    histogram: pd.Series,
    bearish_signals: List[dict],
    bullish_signals: List[dict],
) -> None:
    '\"価格チャートとMACDにダイバージェンスを描画する'\"
    fig, (ax1, ax2) = plt.subplots(
        2, 1, figsize=(14, 9), sharex=True,
        gridspec_kw={"height_ratios": [2, 1]},
    )

    # 価格チャート
    ax1.plot(df.index, df["Close"], color="black", linewidth=0.8, label="終値")
    ax1.set_title(f"{TICKER} 価格とMACDダイバージェンス")
    ax1.set_ylabel("価格")

    for sig in bearish_signals:
        ax1.annotate(
            "▼弱気", xy=(sig["date"], sig["price_curr"]),
            fontsize=8, color="red", ha="center",
        )
        ax1.plot(
            [df.index[sig["price_prev_idx"]], df.index[sig["price_curr_idx"]]],
            [sig["price_prev"], sig["price_curr"]],
            color="red", linestyle="--", linewidth=1.2,
        )

    for sig in bullish_signals:
        ax1.annotate(
            "▲強気", xy=(sig["date"], sig["price_curr"]),
            fontsize=8, color="blue", ha="center",
        )
        ax1.plot(
            [df.index[sig["price_prev_idx"]], df.index[sig["price_curr_idx"]]],
            [sig["price_prev"], sig["price_curr"]],
            color="blue", linestyle="--", linewidth=1.2,
        )

    ax1.legend(loc="upper left")
    ax1.grid(True, alpha=0.3)

    # MACDチャート
    ax2.plot(df.index, macd_line, label="MACD", linewidth=0.8)
    ax2.plot(df.index, signal_line, label="シグナル", linewidth=0.8)
    colors: list = ["green" if v >= 0 else "red" for v in histogram]
    ax2.bar(df.index, histogram, color=colors, alpha=0.4, width=1.0, label="ヒストグラム")

    for sig in bearish_signals:
        ax2.plot(
            [df.index[sig["price_prev_idx"]], df.index[sig["price_curr_idx"]]],
            [sig["macd_prev"], sig["macd_curr"]],
            color="red", linestyle="--", linewidth=1.2,
        )

    for sig in bullish_signals:
        ax2.plot(
            [df.index[sig["price_prev_idx"]], df.index[sig["price_curr_idx"]]],
            [sig["macd_prev"], sig["macd_curr"]],
            color="blue", linestyle="--", linewidth=1.2,
        )

    ax2.set_ylabel("MACD")
    ax2.legend(loc="upper left")
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig("macd_divergence.png", dpi=150)
    plt.show()


# ==============================
# メイン処理
# ==============================
if __name__ == "__main__":
    # データ取得
    ohlc_df: pd.DataFrame = fetch_ohlc(TICKER, START_DATE, END_DATE)
    close: pd.Series = ohlc_df["Close"]

    # MACD算出
    macd_line, signal_line, histogram = calc_macd(
        close, MACD_FAST, MACD_SLOW, MACD_SIGNAL
    )
    ohlc_df["macd"] = macd_line
    ohlc_df["signal"] = signal_line
    ohlc_df["histogram"] = histogram

    # ダイバージェンス検知
    bearish: List[dict] = detect_bearish_divergence(
        close, macd_line, EXTREMA_ORDER, LOOKBACK_BARS
    )
    bullish: List[dict] = detect_bullish_divergence(
        close, macd_line, EXTREMA_ORDER, LOOKBACK_BARS
    )

    # 結果出力
    print("=" * 55)
    print("MACDダイバージェンス検知結果")
    print("=" * 55)
    print(f"分析期間        : {START_DATE} ~ {END_DATE}")
    print(f"弱気ダイバージェンス: {len(bearish)}件")
    print(f"強気ダイバージェンス: {len(bullish)}件")
    print("=" * 55)

    for sig in bearish:
        print(f"[弱気] {sig['date'].strftime('%Y-%m-%d')} "
              f"価格: {sig['price_prev']:.0f}→{sig['price_curr']:.0f} "
              f"MACD: {sig['macd_prev']:.2f}→{sig['macd_curr']:.2f}")

    for sig in bullish:
        print(f"[強気] {sig['date'].strftime('%Y-%m-%d')} "
              f"価格: {sig['price_prev']:.0f}→{sig['price_curr']:.0f} "
              f"MACD: {sig['macd_prev']:.2f}→{sig['macd_curr']:.2f}")

    # 可視化
    plot_divergence(ohlc_df, macd_line, signal_line, histogram, bearish, bullish)

コードの処理フロー解説

上記のコードは、以下の5ステップで構成されています。

* ステップ1(データ取得)fetch_ohlcでyfinanceからOHLC(始値・高値・安値・終値)データを取得する

* ステップ2(MACD算出)calc_macdでEMA(Exponential Moving Average:指数移動平均)ベースのMACD線・シグナル線・ヒストグラムを算出する

* ステップ3(極値検出)scipy.signal.argrelextremaを使い、価格とMACDそれぞれの局所的な高値・安値のインデックスを抽出する

* ステップ4(ダイバージェンス判定):連続する2つの極値について、価格の方向とMACDの方向が逆行しているかを比較し、条件を満たすペアをシグナルとして記録する

* ステップ5(可視化):価格チャートとMACDチャートの2段構成で、検知されたダイバージェンスを破線で描画する

EXTREMA_ORDERを小さくすると検知感度が上がり、大きくするとノイズが減ります。まずはデフォルトの10で試し、結果を見ながら調整してください。

検知結果の読み解き方と実践的な注意点

シグナルの信頼度を判断する3つの基準

自動検知されたダイバージェンスがすべて有効なわけではありません。以下の3つの基準でフィルタリングしてください。

* 極値間の距離:2つの極値が近すぎる(20日未満)場合はノイズの可能性が高い。40日〜80日の間隔があるシグナルを優先する

* MACDの乖離幅:MACDの切り下げ幅(または切り上げ幅)が微小な場合は信頼度が低い。前回極値との差が標準偏差の0.5倍以上あるものを目安にする

* トレンドの強さ:強いトレンド中のダイバージェンスは「だまし」になりやすい。出来高の減少を伴っているかどうかも併せて確認する

ダイバージェンス単体でエントリーするのは危険です。必ず他の指標やプライスアクションと組み合わせてください。

ヒストグラムとMACD線の使い分け

本コードではMACD線の極値でダイバージェンスを検知しています。一方、MACDヒストグラムの極値で判定する手法もあります。

ヒストグラムベースの検知はシグナルが早く出る傾向がありますが、だましも多くなります。detect_bearish_divergence関数の引数macd_linehistogramに差し替えるだけで切り替え可能です。両方の結果を比較し、自分のトレードスタイルに合うほうを選択してください。

【コピペOK】ダイバージェンス発生後の勝率検証コード

検知されたシグナルが実際に有効だったかどうかを検証する追加コードです。メインコードのif __name__ == "__main__":ブロック末尾に追加してください。


    # ==============================
    # シグナル検証(追加分析)
    # ==============================
    FORWARD_DAYS: List[int] = [5, 10, 20]

    def evaluate_signals(
        signals: List[dict], close: pd.Series, forward_days: List[int], signal_type: str
    ) -> pd.DataFrame:
        '\"シグナル発生後のリターンを検証する'\"
        rows: List[dict] = []
        for sig in signals:
            idx: int = sig["price_curr_idx"]
            row: dict = {"date": sig["date"]}
            for fd in forward_days:
                if idx + fd < len(close):
                    future_ret: float = (close.iloc[idx + fd] - close.iloc[idx]) / close.iloc[idx]
                    row[f"{fd}日後リターン(%)"] = round(future_ret * 100, 2)
                else:
                    row[f"{fd}日後リターン(%)"] = None
            rows.append(row)
        eval_df: pd.DataFrame = pd.DataFrame(rows)
        return eval_df

    print("\n--- 弱気ダイバージェンス後のリターン ---")
    if bearish:
        bear_eval: pd.DataFrame = evaluate_signals(bearish, close, FORWARD_DAYS, "bearish")
        print(bear_eval.to_string(index=False))
        for fd in FORWARD_DAYS:
            col: str = f"{fd}日後リターン(%)"
            valid: pd.Series = bear_eval[col].dropna()
            if len(valid) > 0:
                win: int = int((valid < 0).sum())
                print(f"  {fd}日後 下落率: {win}/{len(valid)} ({win/len(valid)*100:.1f}%)")
    else:
        print("  検知なし")

    print("\n--- 強気ダイバージェンス後のリターン ---")
    if bullish:
        bull_eval: pd.DataFrame = evaluate_signals(bullish, close, FORWARD_DAYS, "bullish")
        print(bull_eval.to_string(index=False))
        for fd in FORWARD_DAYS:
            col: str = f"{fd}日後リターン(%)"
            valid: pd.Series = bull_eval[col].dropna()
            if len(valid) > 0:
                win: int = int((valid > 0).sum())
                print(f"  {fd}日後 上昇率: {win}/{len(valid)} ({win/len(valid)*100:.1f}%)")
    else:
        print("  検知なし")

検証結果で弱気ダイバージェンス後の5日後下落率が60%以上であれば、一定の有効性があると判断できます。50%前後であればランダムと変わらないため、パラメータ調整やフィルタ追加が必要です。

よくあるエラーと対処法

検知結果が0件になる

EXTREMA_ORDERが大きすぎると極値が検出されず、ダイバージェンスの判定対象がなくなります。データ期間が短い場合にも同様の現象が起きます。

以下を試してください。

* EXTREMA_ORDER5に下げて再実行する

* START_DATEを2年以上前に設定してデータ量を増やす

* LOOKBACK_BARS120程度に広げて、比較対象の範囲を拡大する

大量のだましシグナルが検知される

EXTREMA_ORDERが小さすぎると、ノイズレベルの小さな山・谷をすべて極値として拾ってしまいます。

以下を試してください。

* EXTREMA_ORDER1520に引き上げる

* 検知後のフィルタとして、MACDの前回極値との差が一定値未満のシグナルを除外するロジックを追加する

* 価格の極値間の変動幅がATR(Average True Range:平均真の変動幅)の1倍未満のものを除外する

argrelextremaの結果がタプルでインデックスエラーになる

scipy.signal.argrelextremaは結果をタプルで返します。indices[0]で最初の要素を取得する必要があります。コード内では対処済みですが、独自に改変した場合にこのエラーが発生しやすくなります。

戻り値をprint(type(indices), indices)で確認し、indices[0]がnumpy配列であることを検証してください。空配列の場合は極値が見つかっていないため、EXTREMA_ORDERの値を調整する必要があります。

まとめ

この記事では、MACDのダイバージェンスをPythonで自動検知するロジックと、検知結果の検証方法を解説しました。

要点を整理します。

* ダイバージェンスは価格とMACDの方向が逆行する現象であり、弱気(価格↑・MACD↓)と強気(価格↓・MACD↑)の2種類がある

* scipy.signal.argrelextremaで局所的な極値を検出し、連続する2つの極値の方向比較でダイバージェンスを判定する

* EXTREMA_ORDERのパラメータは日足で515が実用的な範囲であり、感度とノイズのトレードオフを意識して調整する

* ダイバージェンス単体での売買判断は危険であり、出来高やプライスアクションとの併用が必須

* 検知後5日〜20日のリターンを検証し、有効性が60%以上あるかを定量的に確認する

次のステップとして、RSI(Relative Strength Index:相対力指数)やストキャスティクスなど他のオシレーターでも同じ極値検出ロジックを適用し、複数指標のダイバージェンスが同時に発生する「コンフルエンス(Confluence:合流)」を検知する仕組みに拡張してみてください。

複数指標でダイバージェンスが重なるポイントは信頼度が格段に高まります。detect_bearish_divergence関数の構造をそのまま流用し、入力するシリーズをRSI等に差し替えるだけで実装できます。

タイトルとURLをコピーしました