【PythonでAI投資の始め方】自作アルゴリズムで感情を排除した取引ツールの作り方

基礎知識・戦略

※本記事のコードや情報は執筆時点の仕様に基づいています。投資は自己責任であり、必ずデモ環境や少額資金でテストした上で運用してください。

AI投資という言葉は、今やマーケティング用語として乱用されています。多くのAI投資サービスは、その内部アルゴリズムを「ブラックボックス」として隠蔽し、高い手数料を請求しています。しかし、個人投資家にとって本当に必要なのは、「自分が理解できる透明なアルゴリズム」を自分自身で構築することです。

感情に左右されない投資判断を実現するには、複雑な機械学習モデルは不要です。むしろ、シンプルで理解可能なルールベースのシステムのほうが、長期的には安定した利益を生み出します。本記事では、Pythonを使って、個人投資家が完全に理解でき、カスタマイズできるAI投資アルゴリズムを構築するための、具体的なステップを提供します。

これは「ブラックボックスのAI投資に依存する」ことからの解放であり、「自分の投資判断を自分で制御する」という本来あるべき投資のあり方への回帰です。

「AI投資」の本質とブラックボックスの危険性

AI投資サービスが提供しているものの多くは、以下の特徴を持っています。

内部アルゴリズムが公開されていない(ブラックボックス)→ なぜそのような判断をしたのか理解できない
高い手数料が設定されている(年1~2%)→ 30年で資産の30~60%が手数料で消える
パフォーマンスが市場平均を上回ることが少ない → 実は、個人投資家が手動で投資したほうが利益が出ることも多い
過去のデータで高い成績が示されている → バックテスト結果は過学習を隠蔽している可能性が高い

対して、個人投資家が自分で構築するシステムの利点は以下の通りです。

完全に透明 → 自分が書いたコードなので、なぜそのような判断をしたのか完全に理解できる
コスト無料 → yfinanceやPythonは無料なので、手数料ゼロで運用可能
理解可能 → 複雑な機械学習ではなく、シンプルなルールベースのシステム
長期的に安定 → 単純なシステムほど、市場環境の変化に対応しやすい

特性AI投資サービス自作アルゴリズム
手数料年1~2%0%
透明性ブラックボックス完全に理解可能
カスタマイズ性ほぼ不可能完全自由
期待リターン市場平均程度市場平均+αの可能性
セキュリティ高(企業が管理)中程度(自分が管理)

AI投資を「自作」するメリット

メリット1:感情を完全に排除できる

自動化されたルールベースのシステムは、相場が上昇しても下降しても、同じルールに従って判断します。人間のように「もう少し待ってみよう」「反発するかもしれない」という感情的な判断がありません。

メリット2:手数料がゼロ

AI投資サービスの年1~2%の手数料は、30年という長期間では資産の30~60%という莫大な機会損失になります。自作システムは、yfinanceとPythonさえあれば、追加コストは実質ゼロです。

メリット3:市場環境に応じた動的な調整が可能

AI投資サービスは、一度設定されたアルゴリズムを簡単には変更できません。しかし、自作システムなら、市場環境の変化に応じて、パラメータを柔軟に調整できます。

メリット4:完全な透明性と信頼性

あなたが書いたコードなので、なぜそのような判断をしたのか、完全に理解できます。ブラックボックスの「なぜ?」という疑問が一切ありません。

【コピペOK】個人投資家向けAI投資システムの完全実装

では、Pythonを使って、個人投資家が実装可能な「感情を排除したAI投資アルゴリズム」を構築します。このシステムの設計思想は「シンプルさ」と「理解可能性」です。

import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import requests
import json
from pathlib import Path
import logging

# ==============================
# ロギング設定
# ==============================
logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s: %(message)s',
    handlers=[
        logging.FileHandler('ai_investment.log'),
        logging.StreamHandler()
    ]
)

# ==============================
# 設定エリア
# ==============================
class Config:
    """システム全体の設定"""

    # ポートフォリオ構成(分散投資)
    PORTFOLIO = {
        '7203.T': 0.20,      # トヨタ 20%
        '7201.T': 0.20,      # 日産 20%
        '8058.T': 0.20,      # 三菱UFJ 20%
        '9984.T': 0.20,      # ソフトバンク 20%
        '6758.T': 0.20,      # ソニー 20%
    }

    # 運用パラメータ
    INITIAL_CAPITAL = 1000000        # 初期資金100万円
    MONTHLY_CONTRIBUTION = 50000     # 月次積立5万円

    # テクニカル指標パラメータ
    MA_SHORT = 20
    MA_LONG = 50
    RSI_PERIOD = 14

    # リスク管理パラメータ
    MAX_DRAWDOWN = 0.20              # 最大ドローダウン20%
    STOP_LOSS = 0.05                 # 損切り5%

    # LINE通知設定
    LINE_TOKEN = "YOUR_LINE_NOTIFY_TOKEN"
    LINE_API_URL = "https://notify-api.line.me/api/notify"

    # ファイル設定
    STATE_FILE = "portfolio_state.json"
    PERFORMANCE_LOG = "performance_history.csv"

# ==============================
# ステップ1:ポートフォリオ状態管理
# ==============================
class PortfolioManager:
    """ポートフォリオの状態を管理"""

    def __init__(self, config=Config):
        """
        Args:
            config: システム設定
        """
        self.config = config
        self.state = self._load_state()
        self.positions = {}      # 現在のポジション
        self.performance = []    # パフォーマンス履歴

    def _load_state(self):
        """状態ファイルから状態を復元"""
        state_file = Path(self.config.STATE_FILE)

        if state_file.exists():
            try:
                with open(state_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except Exception as e:
                logging.error(f"状態読み込みエラー: {e}")

        # デフォルト状態
        return {
            'created_at': datetime.now().isoformat(),
            'total_invested': self.config.INITIAL_CAPITAL,
            'current_value': self.config.INITIAL_CAPITAL,
            'cash': self.config.INITIAL_CAPITAL,
            'positions': {},
            'last_rebalance': None,
        }

    def _save_state(self):
        """状態をファイルに保存"""
        try:
            with open(self.config.STATE_FILE, 'w', encoding='utf-8') as f:
                json.dump(self.state, f, ensure_ascii=False, indent=2, default=str)
        except Exception as e:
            logging.error(f"状態保存エラー: {e}")

    def fetch_current_prices(self):
        """全銘柄の現在価格を取得"""
        prices = {}

        for symbol in self.config.PORTFOLIO.keys():
            try:
                df = yf.download(symbol, period='1d', progress=False)
                prices[symbol] = df['Close'].iloc[-1]
            except Exception as e:
                logging.error(f"価格取得エラー({symbol}): {e}")

        return prices

    def calculate_portfolio_value(self, prices):
        """ポートフォリオ総価値を計算"""
        value = self.state['cash']

        for symbol, shares in self.state['positions'].items():
            if symbol in prices:
                value += shares * prices[symbol]

        return value

    def should_rebalance(self):
        """リバランスが必要かチェック"""
        if self.state['last_rebalance'] is None:
            return True

        last_rebalance_date = datetime.fromisoformat(self.state['last_rebalance']).date()
        today = datetime.now().date()

        # 月初にリバランス
        if today.day == 1 and (today - last_rebalance_date).days >= 1:
            return True

        return False

    def rebalance_portfolio(self, prices):
        """ポートフォリオをリバランス"""
        print("[進行中] ポートフォリオをリバランス中...")

        total_value = self.calculate_portfolio_value(prices)

        for symbol, target_weight in self.config.PORTFOLIO.items():
            target_value = total_value * target_weight
            current_value = (self.state['positions'].get(symbol, 0) * prices.get(symbol, 0))
            difference = target_value - current_value

            if abs(difference) > 10000:  # 1万円以上のズレ
                shares_to_trade = int(difference / prices[symbol])

                self.state['positions'][symbol] = (
                    self.state['positions'].get(symbol, 0) + shares_to_trade
                )

                self.state['cash'] -= (shares_to_trade * prices[symbol])

                action = "買い増し" if shares_to_trade > 0 else "売却"
                print(f"{symbol}: {action} {abs(shares_to_trade)}株")

        self.state['last_rebalance'] = datetime.now().isoformat()
        self._save_state()

        print("[成功] リバランス完了")

    def add_monthly_contribution(self, amount, prices):
        """月次積立を実行"""
        print(f"[情報] 月次積立 ¥{amount:,.0f} を実行中...")

        self.state['cash'] += amount
        self.state['total_invested'] += amount

        # 目標ウェイトで配分
        for symbol, weight in self.config.PORTFOLIO.items():
            contribution_amount = amount * weight
            shares = int(contribution_amount / prices.get(symbol, 1))

            self.state['positions'][symbol] = (
                self.state['positions'].get(symbol, 0) + shares
            )
            self.state['cash'] -= (shares * prices[symbol])

        self._save_state()

# ==============================
# ステップ2:テクニカル分析エンジン
# ==============================
class TechnicalAnalyzer:
    """テクニカル指標の計算と分析"""

    def __init__(self, config=Config):
        """
        Args:
            config: システム設定
        """
        self.config = config
        self.indicators = {}

    def analyze_symbol(self, symbol, period='1y'):
        """
        銘柄を分析

        Returns:
            dict: 分析結果
        """
        try:
            # データ取得
            df = yf.download(symbol, period=period, progress=False)

            if df.empty:
                return None

            df = df.fillna(method='ffill')

            # 指標計算
            df['MA_SHORT'] = df['Close'].rolling(window=self.config.MA_SHORT).mean()
            df['MA_LONG'] = df['Close'].rolling(window=self.config.MA_LONG).mean()
            df['RSI'] = self._calculate_rsi(df['Close'])

            # 現在の状況
            current = df.iloc[-1]
            previous = df.iloc[-2]

            return {
                'symbol': symbol,
                'price': current['Close'],
                'ma_short': current['MA_SHORT'],
                'ma_long': current['MA_LONG'],
                'rsi': current['RSI'],
                'trend': 'UP' if current['MA_SHORT'] > current['MA_LONG'] else 'DOWN',
                'signal': self._generate_signal(current, previous),
            }

        except Exception as e:
            logging.error(f"分析エラー({symbol}): {e}")
            return None

    def _calculate_rsi(self, series, period=None):
        """RSIを計算"""
        if period is None:
            period = self.config.RSI_PERIOD

        delta = series.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    def _generate_signal(self, current, previous):
        """
        シグナルを生成

        ロジック:
        1. ゴールデンクロス(短期MA > 長期MA)で買い
        2. デッドクロス(短期MA < 長期MA)で売り
        3. RSIの極端な値は確認
        """
        if pd.isna(current['MA_SHORT']) or pd.isna(current['RSI']):
            return 'HOLD'

        # ゴールデンクロス
        if (previous['MA_SHORT'] <= previous['MA_LONG'] and 
            current['MA_SHORT'] > current['MA_LONG'] and
            current['RSI'] < 70):
            return 'BUY'

        # デッドクロス
        elif (previous['MA_SHORT'] >= previous['MA_LONG'] and 
              current['MA_SHORT'] < current['MA_LONG'] and
              current['RSI'] > 30):
            return 'SELL'

        return 'HOLD'

# ==============================
# ステップ3:リスク管理エンジン
# ==============================
class RiskManager:
    """ポートフォリオのリスクを管理"""

    def __init__(self, config=Config):
        """
        Args:
            config: システム設定
        """
        self.config = config

    def check_stop_loss(self, symbol, entry_price, current_price):
        """
        損切り判定

        Args:
            symbol: 銘柄コード
            entry_price: 買値
            current_price: 現在価格

        Returns:
            bool: 損切り対象ならTrue
        """
        loss_ratio = (current_price - entry_price) / entry_price

        if loss_ratio < -self.config.STOP_LOSS:
            logging.warning(f"{symbol}: 損切り判定 ({loss_ratio*100:.1f}%)")
            return True

        return False

    def check_portfolio_health(self, portfolio_value, peak_value):
        """
        ポートフォリオの健全性をチェック

        Args:
            portfolio_value: 現在のポートフォリオ価値
            peak_value: 過去の最高値

        Returns:
            dict: 健全性チェック結果
        """
        if peak_value == 0:
            peak_value = portfolio_value

        drawdown = (portfolio_value - peak_value) / peak_value

        return {
            'is_healthy': drawdown > -self.config.MAX_DRAWDOWN,
            'current_drawdown': drawdown,
            'max_drawdown_threshold': -self.config.MAX_DRAWDOWN,
        }

# ==============================
# ステップ4:パフォーマンス追跡
# ==============================
class PerformanceTracker:
    """運用成績を追跡"""

    def __init__(self, config=Config):
        """
        Args:
            config: システム設定
        """
        self.config = config
        self.history = self._load_history()

    def _load_history(self):
        """履歴を読み込む"""
        log_file = Path(self.config.PERFORMANCE_LOG)

        if log_file.exists():
            return pd.read_csv(log_file)

        return pd.DataFrame()

    def record_performance(self, portfolio_value, total_invested):
        """パフォーマンスを記録"""
        profit = portfolio_value - total_invested
        profit_rate = profit / total_invested if total_invested > 0 else 0

        record = {
            'date': datetime.now().isoformat(),
            'portfolio_value': portfolio_value,
            'total_invested': total_invested,
            'profit': profit,
            'profit_rate': profit_rate,
        }

        record_df = pd.DataFrame([record])

        if Path(self.config.PERFORMANCE_LOG).exists():
            record_df.to_csv(self.config.PERFORMANCE_LOG, mode='a', 
                            header=False, index=False)
        else:
            record_df.to_csv(self.config.PERFORMANCE_LOG, index=False)

        logging.info(f"パフォーマンス記録: 利益率 {profit_rate*100:+.2f}%")

    def display_summary(self):
        """パフォーマンスサマリーを表示"""
        if self.history.empty:
            return

        latest = self.history.iloc[-1]

        print("\n" + "="*70)
        print("パフォーマンスサマリー")
        print("="*70)
        print(f"総資産額: ¥{latest['portfolio_value']:,.0f}")
        print(f"累計投資額: ¥{latest['total_invested']:,.0f}")
        print(f"含み益: ¥{latest['profit']:,.0f}")
        print(f"運用利回り: {latest['profit_rate']*100:+.2f}%")

# ==============================
# ステップ5:LINE通知システム
# ==============================
class NotificationManager:
    """LINE Notifyで通知を送信"""

    @staticmethod
    def send_signal_notification(analysis_results, config=Config):
        """
        取引シグナルをLINEで通知

        Args:
            analysis_results: 分析結果
            config: システム設定
        """
        if config.LINE_TOKEN == "YOUR_LINE_NOTIFY_TOKEN":
            logging.warning("LINE_TOKEN が設定されていません")
            return

        # シグナル別に分類
        buy_signals = [r for r in analysis_results.values() 
                      if r and r['signal'] == 'BUY']
        sell_signals = [r for r in analysis_results.values() 
                       if r and r['signal'] == 'SELL']

        if not buy_signals and not sell_signals:
            return

        # メッセージを作成
        message = f"\n【AI投資システム シグナル通知】\n"
        message += f"実行時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"

        if buy_signals:
            message += "🟢【買いシグナル】\n"
            for signal in buy_signals:
                message += f"{signal['symbol']}: ¥{signal['price']:,.0f}\n"
                message += f"  MA短期: {signal['ma_short']:.0f}, 長期: {signal['ma_long']:.0f}\n"

        if sell_signals:
            message += "🔴【売りシグナル】\n"
            for signal in sell_signals:
                message += f"{signal['symbol']}: ¥{signal['price']:,.0f}\n"

        message += "\n※このシステムは感情を排除した自動判定です\n"
        message += "※SBI証券での発注は自己責任で行ってください"

        # LINE通知を送信
        try:
            headers = {"Authorization": f"Bearer {config.LINE_TOKEN}"}
            data = {"message": message}
            requests.post(config.LINE_API_URL, headers=headers, data=data)
            logging.info("LINE通知を送信しました")
        except Exception as e:
            logging.error(f"通知エラー: {e}")

# ==============================
# メインシステム
# ==============================
class AIInvestmentSystem:
    """統合されたAI投資システム"""

    def __init__(self):
        """初期化"""
        self.portfolio_manager = PortfolioManager()
        self.technical_analyzer = TechnicalAnalyzer()
        self.risk_manager = RiskManager()
        self.performance_tracker = PerformanceTracker()

    def run_daily_analysis(self):
        """日次分析を実行"""
        print("="*70)
        print("AI投資システム 日次実行")
        print("="*70)
        print()

        # 現在価格を取得
        prices = self.portfolio_manager.fetch_current_prices()

        if not prices:
            print("[エラー] 価格取得失敗")
            return

        # 各銘柄を分析
        print("[進行中] 各銘柄を分析中...")
        analysis_results = {}

        for symbol in Config.PORTFOLIO.keys():
            result = self.technical_analyzer.analyze_symbol(symbol)
            if result:
                analysis_results[symbol] = result

        # ポートフォリオ価値を計算
        portfolio_value = self.portfolio_manager.calculate_portfolio_value(prices)

        # 必要に応じてリバランス
        if self.portfolio_manager.should_rebalance():
            self.portfolio_manager.rebalance_portfolio(prices)

        # 月次積立を実行(月初の場合)
        if datetime.now().day == 1:
            self.portfolio_manager.add_monthly_contribution(
                Config.MONTHLY_CONTRIBUTION, prices
            )

        # パフォーマンスを記録
        self.performance_tracker.record_performance(
            portfolio_value,
            self.portfolio_manager.state['total_invested']
        )

        # LINE通知を送信
        NotificationManager.send_signal_notification(analysis_results)

        # パフォーマンス表示
        self.performance_tracker.display_summary()

        print("\n[成功] 日次分析完了")

# ==============================
# エントリーポイント
# ==============================
if __name__ == "__main__":
    system = AIInvestmentSystem()
    system.run_daily_analysis()

処理フロー:

  • 現在価格を取得 → 各銘柄をテクニカル分析
  • シグナルを生成 → LINE通知
  • ポートフォリオをリバランス → パフォーマンスを記録
  • 次の実行まで待機

AI投資システムの運用方法

ステップ1:環境構築(初回のみ)

# 仮想環境を作成
python -m venv venv

# 仮想環境を有効化
venv\Scripts\activate

# ライブラリをインストール
pip install yfinance pandas numpy requests

ステップ2:設定をカスタマイズ

# Config クラスで以下を設定:
# - PORTFOLIO: あなたの投資銘柄と配分
# - INITIAL_CAPITAL: 初期資金
# - MONTHLY_CONTRIBUTION: 月次積立額
# - LINE_TOKEN: LINE Notify のトークン

ステップ3:Windowsタスクスケジューラで自動実行

1. タスクスケジューラを開く
2. 基本タスクを作成
3. 毎日 16:30 に実行するよう設定
4. プログラム: C:\path\to\python.exe
5. 引数: main.py

よくあるエラーと対処法

エラー1:「ModuleNotFoundError: No module named ‘yfinance’」

原因:ライブラリがインストールされていません。

対処法:

  • 仮想環境が有効になっているか確認(コマンドプロンプトに (venv) が表示されているか)
  • pip install yfinance を実行

エラー2:「LINE通知が来ない」

原因:LINE_TOKEN が設定されていないか、有効期限が切れています。

対処法:

  • https://notify-bot.line.me/my/ で新しいトークンを生成
  • スクリプトの Config.LINE_TOKEN に正確に貼り付け

エラー3:「タスクスケジューラで実行されない」

原因:Pythonパスが正確でないか、作業ディレクトリが設定されていません。

対処法:

  • where python でPythonの完全パスを確認
  • 「開始位置」にスクリプトの親ディレクトリを指定

まとめ

本記事では、ブラックボックスのAI投資サービスに頼らず、Pythonを使って個人投資家が実装可能な「感情を排除したAI投資アルゴリズム」の構築方法を解説しました。

要点を整理します。

  • ブラックボックスの危険性 → 手数料が高く、内部ロジックが不透明
  • 自作システムのメリット → 透明性、コスト無料、完全にカスタマイズ可能
  • シンプルさが鍵 → 複雑な機械学習より、シンプルなルールベースが安定
  • 感情を排除 → 自動化によって、市場の上下動に左右されない判断が可能
  • 実装は容易 → yfinance + Python + LINE Notify で完全に実現可能

次のステップとしては、本記事で提供したコードを自分の環境で実行し、実際に毎日のシグナル通知を受け取ることをお勧めします。その体験を通じて、「AIが判断した理由」を理解することで、アルゴリズム投資への信頼が深まります。

あなたの投資が、透明で理解可能で、感情を排除した判断に基づくものになることを願っています。

タイトルとURLをコピーしました