Spaces:
Sleeping
Sleeping
| """Fractional differentiation β AFML Ch.5 Β§5.4 (BonusPDF p.46). | |
| Why this module exists | |
| ---------------------- | |
| Log-returns achieve stationarity but destroy memory: the binomial weights | |
| ``(1-B)^d`` collapse to ``[1, -1, 0, 0, ...]`` at ``d=1``. For ``d β (0, 1)`` | |
| the weights decay as a long power-law tail, so the series stays stationary | |
| while retaining a long memory of past prices (Table 5.1 in AFML shows most | |
| liquid futures reach ADF stationarity at ``d < 0.6``, and the majority at | |
| ``d < 0.3``). | |
| This is a port of AFML Snippets 5.1, 5.3, 5.4 (BonusPDF pp.48, 51, 53). | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.special import gamma | |
| def get_ffd_weights(d: float, thres: float = 1e-5, max_size: int = 1024) -> np.ndarray: | |
| """Binomial-series weights for the fractional-differencing operator ``(1-B)^d``. | |
| Cuts the series off once ``|w_k| < thres``. Uses ``scipy.special.gamma`` for | |
| a vectorized closed form rather than the recursive loop in AFML Snippet 5.1 | |
| β same values, faster and avoids accumulated float error in long series. | |
| Returns | |
| ------- | |
| np.ndarray of shape ``(n,)`` ordered from oldest to newest: | |
| ``[w_{n-1}, w_{n-2}, ..., w_1, w_0]`` so the dot product with | |
| ``series[t-n+1 : t+1]`` is the differenced value at ``t``. | |
| """ | |
| k = np.arange(max_size) | |
| with np.errstate(invalid="ignore", divide="ignore"): | |
| w = (-1) ** k * gamma(d + 1) / (gamma(k + 1) * gamma(d - k + 1)) | |
| w = np.nan_to_num(w, nan=0.0, posinf=0.0, neginf=0.0) | |
| cutoff = np.argmax(np.abs(w) < thres) if np.any(np.abs(w) < thres) else max_size | |
| if cutoff == 0: | |
| cutoff = max_size | |
| return w[:cutoff][::-1] | |
| def frac_diff_ffd(series: pd.Series | pd.DataFrame, d: float, thres: float = 1e-5) -> pd.DataFrame: | |
| """Fixed-width fractional differencing β AFML Snippet 5.3 (BonusPDF p.51). | |
| The fixed-width window keeps weights stable through time (unlike the | |
| expanding-window variant in Snippet 5.2 which downweights early observations). | |
| """ | |
| if isinstance(series, pd.Series): | |
| series = series.to_frame() | |
| w = get_ffd_weights(d, thres=thres) # shape (width+1,) | |
| width = len(w) - 1 | |
| out = {} | |
| for col in series.columns: | |
| s = series[[col]].ffill().dropna() | |
| if len(s) <= width: | |
| out[col] = pd.Series(index=s.index[width:], dtype=float) | |
| continue | |
| values = s[col].to_numpy() | |
| # Vectorized: build a (n_out, width+1) sliding-window matrix and dot with w | |
| from numpy.lib.stride_tricks import sliding_window_view | |
| windows = sliding_window_view(values, width + 1) | |
| diffed = windows @ w | |
| out[col] = pd.Series(diffed, index=s.index[width:]) | |
| return pd.concat(out, axis=1) | |
| def find_min_d(series: pd.Series, d_range=(0.0, 1.0), n_steps: int = 11, thres: float = 1e-5) -> pd.DataFrame: | |
| """Sweep ``d`` and return ADF stat + correlation β AFML Snippet 5.4 (BonusPDF p.53). | |
| Use to pick the smallest ``d`` for which the FFD-differenced log-price passes | |
| the ADF stationarity test at 95% (statistic < critical value β -2.86). | |
| Returns a frame indexed by ``d`` with columns: ``adf_stat, p_value, n_obs, | |
| crit_95, corr_with_original``. | |
| """ | |
| from statsmodels.tsa.stattools import adfuller | |
| log_series = np.log(series.dropna()).to_frame(name=series.name or "value") | |
| results = {} | |
| for d in np.linspace(d_range[0], d_range[1], n_steps): | |
| diffed = frac_diff_ffd(log_series, d, thres=thres).dropna() | |
| if len(diffed) < 50: | |
| continue | |
| col = diffed.columns[0] | |
| adf = adfuller(diffed[col], maxlag=1, regression="c", autolag=None) | |
| aligned = log_series.loc[diffed.index, col] | |
| corr = float(aligned.corr(diffed[col])) | |
| results[round(d, 3)] = { | |
| "adf_stat": adf[0], | |
| "p_value": adf[1], | |
| "n_obs": adf[3], | |
| "crit_95": adf[4]["5%"], | |
| "corr_with_original": corr, | |
| } | |
| return pd.DataFrame(results).T.rename_axis("d") | |
| def rolling_zscore(series: pd.Series, window: int = 252, min_periods: int | None = None) -> pd.Series: | |
| """Rolling z-score with leak-free statistics (uses only the trailing window). | |
| Stronger than a single fit-on-train ``StandardScaler`` because regime shifts | |
| don't carry stale means forward into the test set. | |
| """ | |
| min_periods = min_periods or max(window // 4, 20) | |
| mu = series.rolling(window=window, min_periods=min_periods).mean() | |
| sd = series.rolling(window=window, min_periods=min_periods).std() | |
| return (series - mu) / sd.replace(0, np.nan) | |