DataSynthis_ML_JobTask / src /labeling.py
moccaram's picture
Replace v1 demo with v2 XGBoost-backed Gradio app (reference-backed rebuild)
8ba081b verified
"""Triple-barrier labeling β€” AFML Ch.3 (BonusPDF pp.26-34).
The triple-barrier method assigns each event one of three labels based on which
of three barriers is hit first:
- ``+1`` β€” upper (profit-taking) horizontal barrier hit first
- ``-1`` β€” lower (stop-loss) horizontal barrier hit first
- ``0`` β€” vertical (max holding period) barrier hit first
The horizontal barriers are scaled by a per-event volatility estimate (typically
EWM daily vol, ``get_daily_vol`` in ``src/data.py``). This is a port of AFML
Snippets 3.2-3.5 and Rambo's cleaner ``get_triple_barrier_label`` (his repo,
``Chapter_3.py``).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
def apply_pt_sl_on_t1(
close: pd.Series, events: pd.DataFrame, pt_sl: tuple[float, float]
) -> pd.DataFrame:
"""AFML Snippet 3.2 (BonusPDF p.27). Find time of first barrier touch.
Parameters
----------
close : pd.Series
Closing-price series, indexed by date.
events : pd.DataFrame
Required columns: ``t1`` (vertical-barrier date or NaT), ``target``
(vol estimate at the event), ``side`` (+1 for long, -1 for short; if
we don't know side, pass +1 for all).
pt_sl : (float, float)
Profit-taking and stop-loss multipliers of ``target``. Pass 0 to disable
a barrier.
Returns
-------
pd.DataFrame indexed like ``events`` with columns ``t1, pt, sl`` containing
the first-touch timestamps (NaT if never touched).
"""
out = events[["t1"]].copy()
pt = pt_sl[0] * events["target"] if pt_sl[0] > 0 else pd.Series(np.nan, index=events.index)
sl = -pt_sl[1] * events["target"] if pt_sl[1] > 0 else pd.Series(np.nan, index=events.index)
for t0, t1 in events["t1"].fillna(close.index[-1]).items():
path_prices = close.loc[t0:t1]
path_returns = (path_prices / close.loc[t0] - 1) * events.at[t0, "side"]
sl_hits = path_returns[path_returns < sl[t0]]
pt_hits = path_returns[path_returns > pt[t0]]
out.at[t0, "sl"] = sl_hits.index.min() if len(sl_hits) else pd.NaT
out.at[t0, "pt"] = pt_hits.index.min() if len(pt_hits) else pd.NaT
return out
def add_vertical_barrier(
close: pd.Series, t_events: pd.DatetimeIndex, num_days: int
) -> pd.Series:
"""AFML Snippet 3.4 (BonusPDF p.30). Vertical (time-limit) barriers.
Returns a Series indexed by ``t_events`` whose values are ``num_days`` later,
snapped to the next available trading day; events too close to the end of
the series are dropped.
"""
t1 = close.index.searchsorted(t_events + pd.Timedelta(days=num_days))
t1 = t1[t1 < close.shape[0]]
return pd.Series(close.index[t1], index=t_events[: len(t1)])
def get_events(
close: pd.Series,
t_events: pd.DatetimeIndex,
pt_sl: tuple[float, float],
target: pd.Series,
min_ret: float,
num_days: int | None = None,
side: pd.Series | None = None,
) -> pd.DataFrame:
"""AFML Snippet 3.3 (BonusPDF p.29). Run triple-barrier for a batch of events.
Returns a DataFrame indexed by event start time with columns:
- ``t1`` (timestamp of the *first* barrier hit β€” earliest of vertical/pt/sl)
- ``vertical_t1`` (the original vertical-barrier date)
- ``barrier_hit`` (one of ``"vertical"`` / ``"pt"`` / ``"sl"`` β€” what was hit
first; used by ``get_bins`` to produce the {-1, 0, +1} label)
- ``target`` (vol estimate at the event)
If ``side`` is provided, it is propagated for downstream meta-labeling.
"""
target = target.reindex(t_events).dropna()
target = target[target > min_ret]
if num_days is not None:
vertical_t1 = add_vertical_barrier(close, target.index, num_days)
else:
vertical_t1 = pd.Series(pd.NaT, index=target.index)
if side is None:
side_ = pd.Series(1.0, index=target.index)
else:
side_ = side.reindex(target.index).fillna(1.0)
events = pd.concat(
{"t1": vertical_t1, "target": target, "side": side_}, axis=1
).dropna(subset=["target"])
touches = apply_pt_sl_on_t1(close, events, pt_sl)
# Drop events where no barrier ever fires (can't happen with a vertical
# barrier present, but defensive against future config changes).
touches = touches.dropna(subset=["t1", "pt", "sl"], how="all")
events = events.loc[touches.index]
# Earliest touch among (vertical, pt, sl); record which barrier won.
all_touches = touches[["t1", "pt", "sl"]]
earliest = all_touches.min(axis=1)
# Manual row-wise argmin: pandas' idxmin chokes on all-NaT slices.
barrier_hit = pd.Series("vertical", index=all_touches.index)
pt_arr = all_touches["pt"]
sl_arr = all_touches["sl"]
vert_arr = all_touches["t1"]
# Replace NaT with a very large date for comparison purposes
far = pd.Timestamp.max
cmp = pd.DataFrame(
{
"pt": pt_arr.fillna(far),
"sl": sl_arr.fillna(far),
"vertical": vert_arr.fillna(far),
}
)
barrier_hit = cmp.idxmin(axis=1)
events["vertical_t1"] = events["t1"]
events["t1"] = earliest
events["barrier_hit"] = barrier_hit.astype(str)
if side is None:
events = events.drop("side", axis=1)
return events.dropna(subset=["t1"])
def get_bins(events: pd.DataFrame, close: pd.Series) -> pd.DataFrame:
"""AFML Snippet 3.5 (BonusPDF p.30). Convert event outcomes to {-1, 0, +1}.
Full triple-barrier semantics: the label depends on which barrier was hit
*first*:
- ``barrier_hit == "pt"`` β†’ ``+1`` (profit-taking, scaled by ``side``)
- ``barrier_hit == "sl"`` β†’ ``-1`` (stop-loss, scaled by ``side``)
- ``barrier_hit == "vertical"`` β†’ ``0`` (no signal; the time limit ran out
before either horizontal barrier was hit)
If meta-labeling (``side`` column present), maps to ``{0, 1}`` for
"don't act" vs "act in this side".
"""
events_ = events.dropna(subset=["t1"]).copy()
px_idx = events_.index.union(events_["t1"].values).unique()
px = close.reindex(px_idx, method="bfill")
out = pd.DataFrame(index=events_.index)
out["ret"] = px.loc[events_["t1"].values].values / px.loc[events_.index].values - 1
if "side" in events_.columns:
out["ret"] *= events_["side"].values
if "barrier_hit" in events_.columns:
# Full triple-barrier: 0 when the vertical barrier (time limit) wins.
out["bin"] = 0
out.loc[events_["barrier_hit"] == "pt", "bin"] = 1
out.loc[events_["barrier_hit"] == "sl", "bin"] = -1
if "side" in events_.columns:
# meta-labeling: collapse to {0, 1} = "don't act / act"
out.loc[out["ret"] <= 0, "bin"] = 0
out.loc[out["bin"] != 0, "bin"] = 1
else:
# Fallback to AFML Snippet 3.5 default (sign of return)
out["bin"] = np.sign(out["ret"]).astype(int)
out["bin"] = out["bin"].astype(int)
return out
def drop_labels(events: pd.DataFrame, min_pct: float = 0.05) -> pd.DataFrame:
"""AFML Snippet 3.8 (BonusPDF p.34). Drop labels with < ``min_pct`` support.
Repeats until every remaining label has at least ``min_pct`` of observations
or fewer than 3 classes remain.
"""
while True:
counts = events["bin"].value_counts(normalize=True)
if counts.min() > min_pct or len(counts) < 3:
break
smallest = counts.idxmin()
events = events[events["bin"] != smallest]
print(f"Dropped label {smallest}: {100 * counts.min():.2f}% of observations")
return events
def cusum_filter(series: pd.Series, threshold: float) -> pd.DatetimeIndex:
"""Symmetric CUSUM filter β€” AFML Β§2.5.2 (general technique).
Generates event start times where the cumulative sum of returns (in either
direction) exceeds ``threshold``. Resets after each event. Returns a
DatetimeIndex of event-trigger timestamps.
Avoids the "predict on every bar" inefficiency by only labeling at
statistically interesting moments.
"""
t_events, s_pos, s_neg = [], 0.0, 0.0
diff = series.diff().fillna(0)
for t, d in diff.items():
s_pos = max(0.0, s_pos + d)
s_neg = min(0.0, s_neg + d)
if s_neg < -threshold:
s_neg = 0.0
t_events.append(t)
elif s_pos > threshold:
s_pos = 0.0
t_events.append(t)
return pd.DatetimeIndex(t_events)