DataSynthis_ML_JobTask / src /features.py
moccaram's picture
Replace v1 demo with v2 XGBoost-backed Gradio app (reference-backed rebuild)
8ba081b verified
"""Fractional differentiation β€” AFML Ch.5 Β§5.4 (BonusPDF p.46).
Why this module exists
----------------------
Log-returns achieve stationarity but destroy memory: the binomial weights
``(1-B)^d`` collapse to ``[1, -1, 0, 0, ...]`` at ``d=1``. For ``d ∈ (0, 1)``
the weights decay as a long power-law tail, so the series stays stationary
while retaining a long memory of past prices (Table 5.1 in AFML shows most
liquid futures reach ADF stationarity at ``d < 0.6``, and the majority at
``d < 0.3``).
This is a port of AFML Snippets 5.1, 5.3, 5.4 (BonusPDF pp.48, 51, 53).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from scipy.special import gamma
def get_ffd_weights(d: float, thres: float = 1e-5, max_size: int = 1024) -> np.ndarray:
"""Binomial-series weights for the fractional-differencing operator ``(1-B)^d``.
Cuts the series off once ``|w_k| < thres``. Uses ``scipy.special.gamma`` for
a vectorized closed form rather than the recursive loop in AFML Snippet 5.1
β€” same values, faster and avoids accumulated float error in long series.
Returns
-------
np.ndarray of shape ``(n,)`` ordered from oldest to newest:
``[w_{n-1}, w_{n-2}, ..., w_1, w_0]`` so the dot product with
``series[t-n+1 : t+1]`` is the differenced value at ``t``.
"""
k = np.arange(max_size)
with np.errstate(invalid="ignore", divide="ignore"):
w = (-1) ** k * gamma(d + 1) / (gamma(k + 1) * gamma(d - k + 1))
w = np.nan_to_num(w, nan=0.0, posinf=0.0, neginf=0.0)
cutoff = np.argmax(np.abs(w) < thres) if np.any(np.abs(w) < thres) else max_size
if cutoff == 0:
cutoff = max_size
return w[:cutoff][::-1]
def frac_diff_ffd(series: pd.Series | pd.DataFrame, d: float, thres: float = 1e-5) -> pd.DataFrame:
"""Fixed-width fractional differencing β€” AFML Snippet 5.3 (BonusPDF p.51).
The fixed-width window keeps weights stable through time (unlike the
expanding-window variant in Snippet 5.2 which downweights early observations).
"""
if isinstance(series, pd.Series):
series = series.to_frame()
w = get_ffd_weights(d, thres=thres) # shape (width+1,)
width = len(w) - 1
out = {}
for col in series.columns:
s = series[[col]].ffill().dropna()
if len(s) <= width:
out[col] = pd.Series(index=s.index[width:], dtype=float)
continue
values = s[col].to_numpy()
# Vectorized: build a (n_out, width+1) sliding-window matrix and dot with w
from numpy.lib.stride_tricks import sliding_window_view
windows = sliding_window_view(values, width + 1)
diffed = windows @ w
out[col] = pd.Series(diffed, index=s.index[width:])
return pd.concat(out, axis=1)
def find_min_d(series: pd.Series, d_range=(0.0, 1.0), n_steps: int = 11, thres: float = 1e-5) -> pd.DataFrame:
"""Sweep ``d`` and return ADF stat + correlation β€” AFML Snippet 5.4 (BonusPDF p.53).
Use to pick the smallest ``d`` for which the FFD-differenced log-price passes
the ADF stationarity test at 95% (statistic < critical value β‰ˆ -2.86).
Returns a frame indexed by ``d`` with columns: ``adf_stat, p_value, n_obs,
crit_95, corr_with_original``.
"""
from statsmodels.tsa.stattools import adfuller
log_series = np.log(series.dropna()).to_frame(name=series.name or "value")
results = {}
for d in np.linspace(d_range[0], d_range[1], n_steps):
diffed = frac_diff_ffd(log_series, d, thres=thres).dropna()
if len(diffed) < 50:
continue
col = diffed.columns[0]
adf = adfuller(diffed[col], maxlag=1, regression="c", autolag=None)
aligned = log_series.loc[diffed.index, col]
corr = float(aligned.corr(diffed[col]))
results[round(d, 3)] = {
"adf_stat": adf[0],
"p_value": adf[1],
"n_obs": adf[3],
"crit_95": adf[4]["5%"],
"corr_with_original": corr,
}
return pd.DataFrame(results).T.rename_axis("d")
def rolling_zscore(series: pd.Series, window: int = 252, min_periods: int | None = None) -> pd.Series:
"""Rolling z-score with leak-free statistics (uses only the trailing window).
Stronger than a single fit-on-train ``StandardScaler`` because regime shifts
don't carry stale means forward into the test set.
"""
min_periods = min_periods or max(window // 4, 20)
mu = series.rolling(window=window, min_periods=min_periods).mean()
sd = series.rolling(window=window, min_periods=min_periods).std()
return (series - mu) / sd.replace(0, np.nan)