File size: 8,439 Bytes
8ba081b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
"""Triple-barrier labeling β€” AFML Ch.3 (BonusPDF pp.26-34).

The triple-barrier method assigns each event one of three labels based on which
of three barriers is hit first:

- ``+1`` β€” upper (profit-taking) horizontal barrier hit first
- ``-1`` β€” lower (stop-loss) horizontal barrier hit first
- ``0``  β€” vertical (max holding period) barrier hit first

The horizontal barriers are scaled by a per-event volatility estimate (typically
EWM daily vol, ``get_daily_vol`` in ``src/data.py``). This is a port of AFML
Snippets 3.2-3.5 and Rambo's cleaner ``get_triple_barrier_label`` (his repo,
``Chapter_3.py``).
"""

from __future__ import annotations

import numpy as np
import pandas as pd


def apply_pt_sl_on_t1(
    close: pd.Series, events: pd.DataFrame, pt_sl: tuple[float, float]
) -> pd.DataFrame:
    """AFML Snippet 3.2 (BonusPDF p.27). Find time of first barrier touch.

    Parameters
    ----------
    close : pd.Series
        Closing-price series, indexed by date.
    events : pd.DataFrame
        Required columns: ``t1`` (vertical-barrier date or NaT), ``target``
        (vol estimate at the event), ``side`` (+1 for long, -1 for short; if
        we don't know side, pass +1 for all).
    pt_sl : (float, float)
        Profit-taking and stop-loss multipliers of ``target``. Pass 0 to disable
        a barrier.

    Returns
    -------
    pd.DataFrame indexed like ``events`` with columns ``t1, pt, sl`` containing
    the first-touch timestamps (NaT if never touched).
    """
    out = events[["t1"]].copy()
    pt = pt_sl[0] * events["target"] if pt_sl[0] > 0 else pd.Series(np.nan, index=events.index)
    sl = -pt_sl[1] * events["target"] if pt_sl[1] > 0 else pd.Series(np.nan, index=events.index)

    for t0, t1 in events["t1"].fillna(close.index[-1]).items():
        path_prices = close.loc[t0:t1]
        path_returns = (path_prices / close.loc[t0] - 1) * events.at[t0, "side"]
        sl_hits = path_returns[path_returns < sl[t0]]
        pt_hits = path_returns[path_returns > pt[t0]]
        out.at[t0, "sl"] = sl_hits.index.min() if len(sl_hits) else pd.NaT
        out.at[t0, "pt"] = pt_hits.index.min() if len(pt_hits) else pd.NaT
    return out


def add_vertical_barrier(
    close: pd.Series, t_events: pd.DatetimeIndex, num_days: int
) -> pd.Series:
    """AFML Snippet 3.4 (BonusPDF p.30). Vertical (time-limit) barriers.

    Returns a Series indexed by ``t_events`` whose values are ``num_days`` later,
    snapped to the next available trading day; events too close to the end of
    the series are dropped.
    """
    t1 = close.index.searchsorted(t_events + pd.Timedelta(days=num_days))
    t1 = t1[t1 < close.shape[0]]
    return pd.Series(close.index[t1], index=t_events[: len(t1)])


def get_events(
    close: pd.Series,
    t_events: pd.DatetimeIndex,
    pt_sl: tuple[float, float],
    target: pd.Series,
    min_ret: float,
    num_days: int | None = None,
    side: pd.Series | None = None,
) -> pd.DataFrame:
    """AFML Snippet 3.3 (BonusPDF p.29). Run triple-barrier for a batch of events.

    Returns a DataFrame indexed by event start time with columns:

    - ``t1`` (timestamp of the *first* barrier hit β€” earliest of vertical/pt/sl)
    - ``vertical_t1`` (the original vertical-barrier date)
    - ``barrier_hit`` (one of ``"vertical"`` / ``"pt"`` / ``"sl"`` β€” what was hit
      first; used by ``get_bins`` to produce the {-1, 0, +1} label)
    - ``target`` (vol estimate at the event)

    If ``side`` is provided, it is propagated for downstream meta-labeling.
    """
    target = target.reindex(t_events).dropna()
    target = target[target > min_ret]

    if num_days is not None:
        vertical_t1 = add_vertical_barrier(close, target.index, num_days)
    else:
        vertical_t1 = pd.Series(pd.NaT, index=target.index)

    if side is None:
        side_ = pd.Series(1.0, index=target.index)
    else:
        side_ = side.reindex(target.index).fillna(1.0)

    events = pd.concat(
        {"t1": vertical_t1, "target": target, "side": side_}, axis=1
    ).dropna(subset=["target"])
    touches = apply_pt_sl_on_t1(close, events, pt_sl)

    # Drop events where no barrier ever fires (can't happen with a vertical
    # barrier present, but defensive against future config changes).
    touches = touches.dropna(subset=["t1", "pt", "sl"], how="all")
    events = events.loc[touches.index]

    # Earliest touch among (vertical, pt, sl); record which barrier won.
    all_touches = touches[["t1", "pt", "sl"]]
    earliest = all_touches.min(axis=1)
    # Manual row-wise argmin: pandas' idxmin chokes on all-NaT slices.
    barrier_hit = pd.Series("vertical", index=all_touches.index)
    pt_arr = all_touches["pt"]
    sl_arr = all_touches["sl"]
    vert_arr = all_touches["t1"]
    # Replace NaT with a very large date for comparison purposes
    far = pd.Timestamp.max
    cmp = pd.DataFrame(
        {
            "pt": pt_arr.fillna(far),
            "sl": sl_arr.fillna(far),
            "vertical": vert_arr.fillna(far),
        }
    )
    barrier_hit = cmp.idxmin(axis=1)

    events["vertical_t1"] = events["t1"]
    events["t1"] = earliest
    events["barrier_hit"] = barrier_hit.astype(str)
    if side is None:
        events = events.drop("side", axis=1)
    return events.dropna(subset=["t1"])


def get_bins(events: pd.DataFrame, close: pd.Series) -> pd.DataFrame:
    """AFML Snippet 3.5 (BonusPDF p.30). Convert event outcomes to {-1, 0, +1}.

    Full triple-barrier semantics: the label depends on which barrier was hit
    *first*:

    - ``barrier_hit == "pt"``  β†’ ``+1`` (profit-taking, scaled by ``side``)
    - ``barrier_hit == "sl"``  β†’ ``-1`` (stop-loss, scaled by ``side``)
    - ``barrier_hit == "vertical"`` β†’ ``0`` (no signal; the time limit ran out
      before either horizontal barrier was hit)

    If meta-labeling (``side`` column present), maps to ``{0, 1}`` for
    "don't act" vs "act in this side".
    """
    events_ = events.dropna(subset=["t1"]).copy()
    px_idx = events_.index.union(events_["t1"].values).unique()
    px = close.reindex(px_idx, method="bfill")

    out = pd.DataFrame(index=events_.index)
    out["ret"] = px.loc[events_["t1"].values].values / px.loc[events_.index].values - 1
    if "side" in events_.columns:
        out["ret"] *= events_["side"].values

    if "barrier_hit" in events_.columns:
        # Full triple-barrier: 0 when the vertical barrier (time limit) wins.
        out["bin"] = 0
        out.loc[events_["barrier_hit"] == "pt", "bin"] = 1
        out.loc[events_["barrier_hit"] == "sl", "bin"] = -1
        if "side" in events_.columns:
            # meta-labeling: collapse to {0, 1} = "don't act / act"
            out.loc[out["ret"] <= 0, "bin"] = 0
            out.loc[out["bin"] != 0, "bin"] = 1
    else:
        # Fallback to AFML Snippet 3.5 default (sign of return)
        out["bin"] = np.sign(out["ret"]).astype(int)
    out["bin"] = out["bin"].astype(int)
    return out


def drop_labels(events: pd.DataFrame, min_pct: float = 0.05) -> pd.DataFrame:
    """AFML Snippet 3.8 (BonusPDF p.34). Drop labels with < ``min_pct`` support.

    Repeats until every remaining label has at least ``min_pct`` of observations
    or fewer than 3 classes remain.
    """
    while True:
        counts = events["bin"].value_counts(normalize=True)
        if counts.min() > min_pct or len(counts) < 3:
            break
        smallest = counts.idxmin()
        events = events[events["bin"] != smallest]
        print(f"Dropped label {smallest}: {100 * counts.min():.2f}% of observations")
    return events


def cusum_filter(series: pd.Series, threshold: float) -> pd.DatetimeIndex:
    """Symmetric CUSUM filter β€” AFML Β§2.5.2 (general technique).

    Generates event start times where the cumulative sum of returns (in either
    direction) exceeds ``threshold``. Resets after each event. Returns a
    DatetimeIndex of event-trigger timestamps.

    Avoids the "predict on every bar" inefficiency by only labeling at
    statistically interesting moments.
    """
    t_events, s_pos, s_neg = [], 0.0, 0.0
    diff = series.diff().fillna(0)
    for t, d in diff.items():
        s_pos = max(0.0, s_pos + d)
        s_neg = min(0.0, s_neg + d)
        if s_neg < -threshold:
            s_neg = 0.0
            t_events.append(t)
        elif s_pos > threshold:
            s_pos = 0.0
            t_events.append(t)
    return pd.DatetimeIndex(t_events)