import marimo __generated_with = "0.23.2" app = marimo.App() @app.cell def _(): import marimo as mo return (mo,) @app.cell(hide_code=True) def _(mo): mo.md(r""" # Quant Trading Scaffold ## Data Ingestion → Indicators → Walk-Forward ML → Backtesting → Tearsheet Pipeline: 1. **Ingest** OHLCV data via yfinance 2. **Engineer features** — momentum, trend, volatility, volume indicators 3. **Label** — binary classification (next-N-day return > 0) 4. **Walk-forward split** with purging (no leakage) 5. **Train** XGBoost classifier per fold 6. **Evaluate** with quantstats tearsheet """) return @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 1. Config & Imports """) return @app.cell def _(): from __future__ import annotations import warnings warnings.filterwarnings("ignore", category=FutureWarning) import numpy as np import pandas as pd import pandas_ta as ta import yfinance as yf import plotly.graph_objects as go from plotly.subplots import make_subplots from sklearn.model_selection import TimeSeriesSplit from sklearn.metrics import accuracy_score, classification_report from xgboost import XGBClassifier import quantstats as qs # ── Config ────────────────────────────────────────────────────── TICKER = "AAPL" START = "2015-01-01" END = "2025-12-31" HORIZON = 5 # predict N-day forward return PURGE_GAP = 5 # gap between train/test to prevent leakage N_SPLITS = 5 # walk-forward folds TRAIN_MIN = 504 # ~2 years minimum training window print(f"Config: {TICKER} | {START}→{END} | horizon={HORIZON}d | {N_SPLITS} folds") return ( END, HORIZON, N_SPLITS, PURGE_GAP, START, TICKER, TRAIN_MIN, XGBClassifier, accuracy_score, classification_report, go, make_subplots, np, pd, qs, ta, yf, ) @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 2. Data Ingestion """) return @app.cell def _(END, START, TICKER, pd, yf): raw = yf.download(TICKER, start=START, end=END, auto_adjust=True) # yfinance may return MultiIndex columns for single ticker — flatten if isinstance(raw.columns, pd.MultiIndex): raw.columns = raw.columns.droplevel("Ticker") raw.index = pd.DatetimeIndex(raw.index) df = raw.copy() print(f"Downloaded {len(df)} bars: {df.index[0].date()} → {df.index[-1].date()}") df.tail(3) return (df,) @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 3. Feature Engineering — Technical Indicators We compute features across 4 categories: - **Momentum**: RSI, MACD, Stochastic, Williams %R, ROC - **Trend**: SMA/EMA crossovers, ADX, Ichimoku - **Volatility**: Bollinger Bands, ATR, Keltner Channels - **Volume**: OBV, MFI, Accumulation/Distribution """) return @app.cell def _(df, ta): # ── Momentum ──────────────────────────────────────────────────── df["rsi_14"] = ta.rsi(df["Close"], length=14) df["rsi_7"] = ta.rsi(df["Close"], length=7) macd = ta.macd(df["Close"], fast=12, slow=26, signal=9) df["macd"] = macd.iloc[:, 0] # MACD line df["macd_signal"] = macd.iloc[:, 1] # signal line df["macd_hist"] = macd.iloc[:, 2] # histogram stoch = ta.stoch(df["High"], df["Low"], df["Close"]) df["stoch_k"] = stoch.iloc[:, 0] df["stoch_d"] = stoch.iloc[:, 1] df["willr_14"] = ta.willr(df["High"], df["Low"], df["Close"], length=14) df["roc_10"] = ta.roc(df["Close"], length=10) df["roc_21"] = ta.roc(df["Close"], length=21) df["mom_10"] = ta.mom(df["Close"], length=10) # ── Trend ─────────────────────────────────────────────────────── df["sma_20"] = ta.sma(df["Close"], length=20) df["sma_50"] = ta.sma(df["Close"], length=50) df["sma_200"] = ta.sma(df["Close"], length=200) df["ema_12"] = ta.ema(df["Close"], length=12) df["ema_26"] = ta.ema(df["Close"], length=26) # crossover features (price relative to MAs) df["close_over_sma20"] = (df["Close"] / df["sma_20"]) - 1 df["close_over_sma50"] = (df["Close"] / df["sma_50"]) - 1 df["close_over_sma200"] = (df["Close"] / df["sma_200"]) - 1 df["sma20_over_sma50"] = (df["sma_20"] / df["sma_50"]) - 1 df["sma50_over_sma200"] = (df["sma_50"] / df["sma_200"]) - 1 adx = ta.adx(df["High"], df["Low"], df["Close"], length=14) df["adx"] = adx.iloc[:, 0] df["di_plus"] = adx.iloc[:, 1] df["di_minus"] = adx.iloc[:, 2] # ── Volatility ────────────────────────────────────────────────── bbands = ta.bbands(df["Close"], length=20, std=2) df["bb_upper"] = bbands.iloc[:, 0] df["bb_mid"] = bbands.iloc[:, 1] df["bb_lower"] = bbands.iloc[:, 2] df["bb_width"] = bbands.iloc[:, 3] df["bb_pctb"] = bbands.iloc[:, 4] # %B: where price is within bands df["atr_14"] = ta.atr(df["High"], df["Low"], df["Close"], length=14) df["atr_pct"] = df["atr_14"] / df["Close"] # normalized ATR kc = ta.kc(df["High"], df["Low"], df["Close"], length=20) df["kc_upper"] = kc.iloc[:, 0] df["kc_lower"] = kc.iloc[:, 1] # volatility: rolling std of returns df["vol_10"] = df["Close"].pct_change().rolling(10).std() df["vol_21"] = df["Close"].pct_change().rolling(21).std() # ── Volume ────────────────────────────────────────────────────── df["obv"] = ta.obv(df["Close"], df["Volume"]) df["obv_sma20"] = ta.sma(df["obv"], length=20) df["mfi_14"] = ta.mfi(df["High"], df["Low"], df["Close"], df["Volume"], length=14) ad = ta.ad(df["High"], df["Low"], df["Close"], df["Volume"]) df["ad_line"] = ad # volume relative to average df["vol_ratio_20"] = df["Volume"] / df["Volume"].rolling(20).mean() # ── Returns features ──────────────────────────────────────────── df["ret_1d"] = df["Close"].pct_change(1) df["ret_5d"] = df["Close"].pct_change(5) df["ret_10d"] = df["Close"].pct_change(10) df["ret_21d"] = df["Close"].pct_change(21) print(f"Total columns after feature engineering: {len(df.columns)}") df.tail(3) return @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 4. Labeling — Forward Return Classification Target: is the N-day forward return positive? (buy signal = 1, sell/hold signal = 0) """) return @app.cell def _(HORIZON, df): # forward return (what we're predicting) df["fwd_ret"] = df["Close"].pct_change(HORIZON).shift(-HORIZON) df["label"] = (df["fwd_ret"] > 0).astype(int) # ── Define feature columns (exclude raw OHLCV, target, and non-stationary cols) EXCLUDE = { "Open", "High", "Low", "Close", "Volume", "fwd_ret", "label", "sma_20", "sma_50", "sma_200", "ema_12", "ema_26", # non-stationary "bb_upper", "bb_mid", "bb_lower", # non-stationary "kc_upper", "kc_lower", # non-stationary "obv", "obv_sma20", "ad_line", # non-stationary } FEATURES = [c for c in df.columns if c not in EXCLUDE] # drop rows with NaN (from indicator warm-up + forward label) model_df = df[FEATURES + ["label", "fwd_ret"]].dropna() print(f"Features: {len(FEATURES)}") print(f"Usable rows: {len(model_df)} ({model_df.index[0].date()} → {model_df.index[-1].date()})") print(f"Label balance: {model_df['label'].value_counts(normalize=True).to_dict()}") print(f"\nFeature list:\n{FEATURES}") return FEATURES, model_df @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 5. Walk-Forward Split with Purge Gap Time series data **cannot** use random k-fold — future data would leak into training. We use **expanding-window walk-forward** with a **purge gap** between train/test: ``` Fold 1: [====TRAIN====]--gap--[TEST] Fold 2: [========TRAIN========]--gap--[TEST] Fold 3: [============TRAIN============]--gap--[TEST] ``` The gap prevents label leakage from overlapping forward-return windows. """) return @app.cell def _(FEATURES, N_SPLITS, PURGE_GAP, TRAIN_MIN, go, model_df, np): def walk_forward_splits(n_samples: int, n_splits: int, test_size: int=126, purge_gap: int=5, min_train: int=504): """ Expanding-window walk-forward with purge gap. Yields (train_idx, test_idx) index arrays. test_size: ~6 months of trading days min_train: ~2 years of trading days purge_gap: days between train end and test start """ total_test = n_splits * test_size if min_train + total_test + n_splits * purge_gap > n_samples: raise ValueError(f'Not enough data for {n_splits} splits. Need {min_train + total_test + n_splits * purge_gap}, have {n_samples}') for i in range(n_splits): test_end = n_samples - (n_splits - 1 - i) * test_size test_start = test_end - test_size train_end = test_start - purge_gap train_start = 0 train_idx = np.arange(train_start, train_end) test_idx = np.arange(test_start, test_end) yield (train_idx, test_idx) # expanding window (use max(0, train_end - fixed_window) for sliding) X = model_df[FEATURES].values y = model_df['label'].values dates = model_df.index _fig = go.Figure() for _fold, (_tr_idx, _te_idx) in enumerate(walk_forward_splits(len(X), N_SPLITS, purge_gap=PURGE_GAP, min_train=TRAIN_MIN)): _fig.add_trace(go.Scatter(x=[dates[_tr_idx[0]], dates[_tr_idx[-1]]], y=[_fold, _fold], mode='lines', line=dict(color='steelblue', width=8), name=f'Train {_fold}' if _fold == 0 else None, showlegend=_fold == 0)) # ── Visualize the splits ──────────────────────────────────────── _fig.add_trace(go.Scatter(x=[dates[_te_idx[0]], dates[_te_idx[-1]]], y=[_fold, _fold], mode='lines', line=dict(color='coral', width=8), name=f'Test {_fold}' if _fold == 0 else None, showlegend=_fold == 0)) print(f'Fold {_fold}: train {dates[_tr_idx[0]].date()}→{dates[_tr_idx[-1]].date()} ({len(_tr_idx)}d) | test {dates[_te_idx[0]].date()}→{dates[_te_idx[-1]].date()} ({len(_te_idx)}d)') _fig.update_layout(title='Walk-Forward Splits', yaxis_title='Fold', height=300) _fig.show() return X, dates, walk_forward_splits, y @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 6. Train XGBoost per Fold — Walk-Forward Train on expanding window, predict test fold, collect out-of-sample predictions. """) return @app.cell def _( N_SPLITS, PURGE_GAP, TRAIN_MIN, X, XGBClassifier, accuracy_score, classification_report, dates, model_df, walk_forward_splits, y, ): oos_preds = [] # out-of-sample predictions oos_proba = [] # predicted probabilities oos_labels = [] oos_dates = [] oos_fwd_ret = [] fold_metrics = [] for _fold, (_tr_idx, _te_idx) in enumerate(walk_forward_splits(len(X), N_SPLITS, purge_gap=PURGE_GAP, min_train=TRAIN_MIN)): X_train, y_train = (X[_tr_idx], y[_tr_idx]) X_test, y_test = (X[_te_idx], y[_te_idx]) model = XGBClassifier(n_estimators=300, max_depth=4, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, random_state=42, eval_metric='logloss', early_stopping_rounds=30) model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False) preds = model.predict(X_test) proba = model.predict_proba(X_test)[:, 1] acc = accuracy_score(y_test, preds) oos_preds.extend(preds) oos_proba.extend(proba) oos_labels.extend(y_test) oos_dates.extend(dates[_te_idx]) oos_fwd_ret.extend(model_df['fwd_ret'].values[_te_idx]) fold_metrics.append({'fold': _fold, 'accuracy': acc, 'train_size': len(_tr_idx), 'test_size': len(_te_idx)}) print(f'Fold {_fold}: acc={acc:.3f} | train={len(_tr_idx)} | test={len(_te_idx)}') print(f'\nOverall OOS accuracy: {accuracy_score(oos_labels, oos_preds):.3f}') print(classification_report(oos_labels, oos_preds, target_names=['SELL/HOLD', 'BUY'])) return model, oos_dates, oos_fwd_ret, oos_preds, oos_proba @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 7. Feature Importance (Last Fold) """) return @app.cell def _(FEATURES, go, model, pd): imp = pd.Series(model.feature_importances_, index=FEATURES).sort_values(ascending=True) _fig = go.Figure(go.Bar(x=imp.tail(20), y=imp.tail(20).index, orientation='h')) _fig.update_layout(title='Top 20 Feature Importances (last fold)', height=500, margin=dict(l=150)) _fig.show() return @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 8. Strategy Simulation — Signal → Returns Convert model predictions to a strategy equity curve: - **Signal = 1 (BUY)**: go long (earn the market return) - **Signal = 0 (SELL/HOLD)**: stay in cash (earn 0) Compare against buy-and-hold benchmark. """) return @app.cell def _(df, go, oos_dates, oos_fwd_ret, oos_preds, oos_proba, pd): # Build strategy returns series from OOS predictions strat = pd.DataFrame({'date': oos_dates, 'signal': oos_preds, 'proba': oos_proba, 'fwd_ret': oos_fwd_ret}).set_index('date') daily_ret = df['Close'].pct_change().reindex(strat.index) strat['strat_ret'] = daily_ret * strat['signal'] strat['bench_ret'] = daily_ret strat['strat_equity'] = (1 + strat['strat_ret']).cumprod() strat['bench_equity'] = (1 + strat['bench_ret']).cumprod() _fig = go.Figure() # daily returns: we use daily close-to-close returns, masked by signal # align with actual daily returns (not forward returns) for proper equity curve _fig.add_trace(go.Scatter(x=strat.index, y=strat['strat_equity'], name='Strategy', line=dict(color='steelblue'))) _fig.add_trace(go.Scatter(x=strat.index, y=strat['bench_equity'], name='Buy & Hold', line=dict(color='gray', dash='dot'))) # strategy return: market return when signal=1, 0 when signal=0 in_market = strat['signal'] == 1 _changes = in_market.astype(int).diff().fillna(0) entries = strat.index[_changes == 1] # cumulative exits = strat.index[_changes == -1] if in_market.iloc[0]: entries = entries.insert(0, strat.index[0]) # plot if in_market.iloc[-1]: exits = exits.append(pd.DatetimeIndex([strat.index[-1]])) for ent, ext in zip(entries, exits): _fig.add_vrect(x0=ent, x1=ext, fillcolor='green', opacity=0.07, line_width=0) # shade buy signals _fig.update_layout(title='Strategy vs Buy & Hold (OOS)', yaxis_title='Equity ($1 start)', height=450) _fig.show() print(f"Strategy final: ${strat['strat_equity'].iloc[-1]:.2f}") # align: if first signal is 1, start from beginning print(f"Benchmark final: ${strat['bench_equity'].iloc[-1]:.2f}") return (strat,) @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 9. QuantStats Tearsheet Full performance report: Sharpe, Sortino, max drawdown, rolling metrics, monthly heatmap. """) return @app.cell def _(pd, qs, strat): # quantstats expects a returns series with datetime index strategy_returns = strat["strat_ret"].copy() strategy_returns.index = pd.DatetimeIndex(strategy_returns.index) benchmark_returns = strat["bench_ret"].copy() benchmark_returns.index = pd.DatetimeIndex(benchmark_returns.index) qs.extend_pandas() # key metrics print("=" * 50) print("STRATEGY METRICS (out-of-sample)") print("=" * 50) print(f"Sharpe: {qs.stats.sharpe(strategy_returns):.2f}") print(f"Sortino: {qs.stats.sortino(strategy_returns):.2f}") print(f"Max Drawdown: {qs.stats.max_drawdown(strategy_returns):.2%}") print(f"CAGR: {qs.stats.cagr(strategy_returns):.2%}") print(f"Calmar: {qs.stats.calmar(strategy_returns):.2f}") print(f"Win Rate: {qs.stats.win_rate(strategy_returns):.2%}") print(f"Volatility: {qs.stats.volatility(strategy_returns):.2%}") print(f"Avg Win: {qs.stats.avg_win(strategy_returns):.4f}") print(f"Avg Loss: {qs.stats.avg_loss(strategy_returns):.4f}") print(f"Profit Factor:{qs.stats.profit_factor(strategy_returns):.2f}") print("=" * 50) return benchmark_returns, strategy_returns @app.cell def _(TICKER, benchmark_returns, qs, strategy_returns): # full HTML tearsheet — saved to file + displayed inline qs.reports.html(strategy_returns, benchmark=benchmark_returns, title=f"{TICKER} ML Signal Strategy (OOS Walk-Forward)", output="tearsheet.html") print("Tearsheet saved to tearsheet.html") return @app.cell(hide_code=True) def _(mo): mo.md(r""" ## 10. Signal Dashboard — Price + Indicators + Buy/Sell Signals """) return @app.cell def _(TICKER, df, go, make_subplots, strat): # show last fold's test period with signals overlaid on price last_test_dates = strat.index[-126:] # last ~6 months viz = df.loc[last_test_dates].copy() sig = strat.loc[last_test_dates] _fig = make_subplots(rows=4, cols=1, shared_xaxes=True, row_heights=[0.4, 0.2, 0.2, 0.2], vertical_spacing=0.03, subplot_titles=['Price + Bollinger Bands + Signals', 'RSI(14)', 'MACD', 'Volume']) _fig.add_trace(go.Candlestick(x=viz.index, open=viz['Open'], high=viz['High'], low=viz['Low'], close=viz['Close'], name='OHLC', increasing_line_color='steelblue', decreasing_line_color='salmon'), row=1, col=1) _fig.add_trace(go.Scatter(x=viz.index, y=viz['bb_upper'], line=dict(color='gray', width=1, dash='dot'), name='BB Upper'), row=1, col=1) _fig.add_trace(go.Scatter(x=viz.index, y=viz['bb_lower'], line=dict(color='gray', width=1, dash='dot'), name='BB Lower', fill='tonexty', fillcolor='rgba(128,128,128,0.05)'), row=1, col=1) _fig.add_trace(go.Scatter(x=viz.index, y=viz['sma_50'], line=dict(color='orange', width=1), name='SMA 50'), row=1, col=1) buy_mask = sig['signal'] == 1 _changes = buy_mask.astype(int).diff() buy_entries = sig.index[_changes == 1] # Row 1: Candlestick + BB + signals sell_entries = sig.index[_changes == -1] if len(buy_entries): _fig.add_trace(go.Scatter(x=buy_entries, y=viz.loc[buy_entries, 'Low'] * 0.995, mode='markers', marker=dict(symbol='triangle-up', size=10, color='green'), name='BUY'), row=1, col=1) if len(sell_entries): _fig.add_trace(go.Scatter(x=sell_entries, y=viz.loc[sell_entries, 'High'] * 1.005, mode='markers', marker=dict(symbol='triangle-down', size=10, color='red'), name='SELL'), row=1, col=1) _fig.add_trace(go.Scatter(x=viz.index, y=viz['rsi_14'], line=dict(color='purple', width=1.5), name='RSI 14'), row=2, col=1) _fig.add_hline(y=70, line_dash='dash', line_color='red', opacity=0.5, row=2, col=1) _fig.add_hline(y=30, line_dash='dash', line_color='green', opacity=0.5, row=2, col=1) # buy/sell markers _fig.add_trace(go.Scatter(x=viz.index, y=viz['macd'], line=dict(color='blue', width=1.5), name='MACD'), row=3, col=1) _fig.add_trace(go.Scatter(x=viz.index, y=viz['macd_signal'], line=dict(color='orange', width=1), name='Signal'), row=3, col=1) colors = ['green' if v >= 0 else 'red' for v in viz['macd_hist']] _fig.add_trace(go.Bar(x=viz.index, y=viz['macd_hist'], marker_color=colors, name='Hist', opacity=0.5), row=3, col=1) _fig.add_trace(go.Bar(x=viz.index, y=viz['Volume'], marker_color='steelblue', name='Volume', opacity=0.5), row=4, col=1) _fig.add_trace(go.Scatter(x=viz.index, y=viz['Volume'].rolling(20).mean(), line=dict(color='orange', width=1), name='Vol SMA20'), row=4, col=1) _fig.update_layout(height=900, title=f'{TICKER} — Last Test Fold Signal Dashboard', xaxis_rangeslider_visible=False, showlegend=False) _fig.update_xaxes(rangeslider_visible=False) # Row 2: RSI # Row 3: MACD # Row 4: Volume _fig.show() return @app.cell(hide_code=True) def _(mo): mo.md(r""" ## Next Steps Things to iterate on from here: 1. **Multi-asset**: swap `TICKER` to BTC-USD, QQQ, GLD, etc. or loop over a universe 2. **Probability threshold**: instead of binary 0/1, use `proba > 0.6` for higher-conviction signals 3. **Position sizing**: Kelly criterion via `PyPortfolioOpt` based on predicted probability 4. **Regime filter**: add ADX/volatility regime detection — only trade in trending regimes 5. **Transaction costs**: subtract realistic slippage (e.g., 5bps per trade) from returns 6. **Alternative splitters you have installed**: - `from tscv import GapWalkForward` — sklearn-compatible, handles gap + purge natively - `from sktime.split import ExpandingWindowSplitter, SlidingWindowSplitter` - `from sklearn.model_selection import TimeSeriesSplit` — basic but solid 7. **LightGBM**: drop-in replacement for XGBoost, often faster on large feature sets 8. **Meta-labeling** (Lopez de Prado): train a secondary model on whether the primary model's signals are correct """) return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return @app.cell def _(): return if __name__ == "__main__": app.run()