Source code for onetick.ml.impl.evaluators.default_evaluators

import itertools
from random import choices
from typing import Literal

import numpy as np
import pandas as pd
import ray
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, \
    mean_squared_log_error, mean_absolute_percentage_error, median_absolute_error, \
    accuracy_score, recall_score, precision_score, f1_score, roc_auc_score

from onetick.ml.interfaces import BaseEvaluator
from onetick.ml.utils import logger, root_mean_squared_error, symmetric_mean_absolute_percentage_error

METRIC_TYPE = Literal['R2', 'MAE', 'RMSE', 'MSLE', 'MAPE', 'SMAPE', 'ACC', 'REC', 'PREC', 'F1', 'ROC_AUC']


# TODO: Rename to MetricsEvaluator
[docs]class BaseMethodEvaluator(BaseEvaluator): """Base class for evaluators using simple loss function with interface `(y_test, predict)`. Override `_evaluator_method` to set loss function. Attributes ---------- _evaluator_method : function loss function with interface `(y_test, predict)` """ _evaluator_method = None
[docs] def evaluate(self, y_test: pd.DataFrame, predict: pd.DataFrame): """Evaluate loss by comparing `y_test` and `predict`. Parameters ---------- y_test : pandas.DataFrame Ground truth (correct) target values. predict : pandas.DataFrame Estimated target values. Returns ------- float calculated loss """ return self._evaluator_method.__func__(y_test, predict)
@property def name(self): return self.__class__.__name__.replace("Evaluator", "")
[docs]class OneStepPredictionIntervals(BaseEvaluator): """Evaluator for one-step prediction intervals: https://otexts.com/fpp3/prediction-intervals.html """
[docs] def evaluate(self, y, prediction: pd.DataFrame, z_value: float = 1.96): """Evaluate one-step prediction interval using the standard deviation of the residuals. Parameters ---------- y : pandas.DataFrame Ground truth (correct) target values. prediction : pandas.DataFrame Estimated target values. z_value : float z-value for confidence interval. Default is 1.96 for 95% confidence interval. Returns ------- pandas.DataFrame calculated one-step prediction interval for each target column """ residuals_sq = pd.DataFrame() for column in y.columns: residuals_sq[f'{column}_ERROR_SQ'] = (y[column] - prediction[column]) ** 2 n = len(residuals_sq) var_residuals = residuals_sq.sum() delta_vals = {} for column in y.columns: delta_vals[f'{column}_DELTA'] = z_value * (var_residuals[f'{column}_ERROR_SQ'] / (n - 1)) ** 0.5 return delta_vals
[docs]class BootstrapPredictionIntervals(BaseEvaluator): # pragma: no cover # NOT WORKING YET # https://saattrupdan.github.io/2020-03-01-bootstrap-prediction/ def __init__(self): pass # TODO in this way calculation will work only with 1 Volume column, it needs to generalize for any amount of column
[docs] def evaluate(self, experiment=None, bucket_size: int = 39, resampling_num: int = 5, alpha: float = 0.05): """Calculate confidence interval with feature sampling Parameters ---------- experiment : Experiment or inherited class instance of `Experiment` or inherited class. bucket_size : int Size of block for bootstrapping. resampling_num : int Number of resamples. alpha : float The prediction uncertainty. Returns ------- tuple of (float, float) tuple of calculated mean and standard deviation """ class _Experiment(experiment.__class__): pass if experiment.val_params['val_type'] in ['None', None]: _Experiment.val_params = {**experiment.val_params, 'val_type': 'Simple'} # TODO Need to extract cur_model_params from local saved model when we loading model (if possible) _Experiment.model_params = {k: [v] for k, v in experiment.current_model_params.items()} # save prediction on test sample before it will be overwritten in the next experiment.predict call prediction_test_original = experiment.prediction_reverse_processed logger.debug('prediction_test_original: {prediction_test_original}') train_preds = experiment.predict(experiment.x_train, preproc_reverse=False) train_residuals = experiment.y_train['VOLUME'] - train_preds['VOLUME'] print('experiment.y_train', experiment.y_train) print('train_preds', train_preds) print('train_residuals', train_residuals) y_test_unprocessed_original = experiment.y_unprocessed print('y_test_unprocessed_original', y_test_unprocessed_original) x_test_original = experiment.x_test proc_df_original = experiment.proc_df all_idx = experiment.df.index test_idx = x_test_original.index # determine all train sample indexes including cutting one during preprocessing train_indexes = list(all_idx.difference(test_idx, sort=False)) bucket_num = len(train_indexes) // bucket_size if bucket_num < 50: logger.warning('Too small amount of buckets, the calculation may not be representative.') bucket_indexes = [train_indexes[bucket_size * i:bucket_size * (i + 1)] for i in range(bucket_num)] bucket_reminder = train_indexes[bucket_size * bucket_num:bucket_size * (bucket_num + 1)] train_samples_indexes = [list(itertools.chain.from_iterable(choices(bucket_indexes, k=len(bucket_indexes)))) + bucket_reminder for _ in range(resampling_num)] val_residuals = [] bootstrap_test_preds = np.zeros([len(x_test_original), resampling_num]) # bootstrap_test_preds = np.empty(resampling_num) for i, train_sample_indexes in enumerate(train_samples_indexes): # TODO it needs to decide in the experiment which indexes we use count or Time, # in the current implementation I should to use reset_index below to correct working of intraday_averaging # Update! I've changed intraday_averaging implementation and did not test prediction intervals after that! df = experiment.df.loc[train_sample_indexes].copy(deep=True) df = df.reset_index(drop=True) new_exp = _Experiment() # TODO this todo is already written in the experiment, # add prepare_data parameter for do not splitting train-test x_train1, x_train2, y_train1, y_train2 = new_exp.prepare_data(df=df) x_train = pd.concat([x_train1, x_train2]) y_train = pd.concat([y_train1, y_train2]) new_exp.init_fit(x_train=x_train, y_train=y_train) bootstrap_val_pred = new_exp.predict(x=new_exp.x_val, preproc_reverse=False) val_residuals.append(new_exp.y_val['VOLUME'] - bootstrap_val_pred['VOLUME']) print('val_residual', val_residuals[-1]) # TODO do we need to do reverse processing here? bootstrap_test_pred = new_exp.predict(x=x_test_original, proc_df=proc_df_original) # bootstrap_test_pred = new_exp.predict(x=x_test_original, preproc_reverse=False) print('bootstrap_test_pred', bootstrap_test_pred) bootstrap_test_preds[:, i] = np.ravel(bootstrap_test_pred['VOLUME']) # bootstrap_test_preds[i] = bootstrap_test_pred['VOLUME'] print('bootstrap_test_preds', bootstrap_test_preds) ray.shutdown() bootstrap_test_preds -= np.atleast_2d(np.mean(bootstrap_test_preds, axis=1)).T # bootstrap_test_preds -= np.mean(bootstrap_test_preds) val_residuals = np.concatenate(val_residuals) print('val_residuals', val_residuals) print('bootstrap_test_preds', bootstrap_test_preds) val_residuals = np.percentile(val_residuals, q=np.arange(100)) print('val_residuals', val_residuals) train_residuals = np.percentile(train_residuals, q=np.arange(100)) print('train_residuals', train_residuals) # TODO: Do permutation each time for each new test data point? # TODO: Should we use BLOCK permutation here? no_information_error = np.mean(np.abs(np.random.permutation(experiment.y_train['VOLUME']) - np.random.permutation(train_preds['VOLUME']) ) ) print('no_information_error', no_information_error) generalisation = np.abs(val_residuals.mean() - train_residuals.mean()) print('generalisation', generalisation) no_information_val = np.abs(no_information_error - train_residuals) print('no_information_val', no_information_val) relative_overfitting_rate = np.mean(generalisation / no_information_val) print('relative_overfitting_rate', relative_overfitting_rate) weight = .632 / (1 - .368 * relative_overfitting_rate) print('weight', weight) residuals = (1 - weight) * train_residuals + weight * val_residuals print('residuals', residuals) C = [] for bootstrap_test_pred in bootstrap_test_preds: C.append(np.array([m + o for m in bootstrap_test_pred for o in residuals])) print(len(C)) print(len(C[-1])) qs = [100 * alpha / 2, 100 * (1 - alpha / 2)] print('qs', qs) percentiles = [] for c in C: percentiles.append(np.percentile(c, q=qs)) print(len(percentiles)) print('percentiles', percentiles) percentiles_df = pd.DataFrame(percentiles, columns=['LOWER_OFFSET', 'UPPER_OFFSET'], index=prediction_test_original.index ).join([prediction_test_original, y_test_unprocessed_original]) print('percentiles_df', percentiles_df) percentiles_df['VOLUME_LOWER_BORDER'] = percentiles_df['VOLUME'] + percentiles_df['LOWER_OFFSET'] percentiles_df['VOLUME_UPPER_BORDER'] = percentiles_df['VOLUME'] + percentiles_df['UPPER_OFFSET'] print('result_df', percentiles_df) return percentiles_df
[docs]class R2Evaluator(BaseMethodEvaluator): """ R2 score evaluator """ _evaluator_method = r2_score
[docs]class MAEEvaluator(BaseMethodEvaluator): """ MAE score evaluator """ _evaluator_method = mean_absolute_error
[docs]class MSEEvaluator(BaseMethodEvaluator): """ MSE score evaluator """ _evaluator_method = mean_squared_error
[docs]class MSLEEvaluator(BaseMethodEvaluator): """ MSLE score evaluator """ _evaluator_method = mean_squared_log_error
[docs]class MAPEEvaluator(BaseMethodEvaluator): """ MAPE score evaluator. """ _evaluator_method = mean_absolute_percentage_error
[docs]class MdAEEvaluator(BaseMethodEvaluator): """ MdAE score evaluator """ _evaluator_method = median_absolute_error
[docs]class RMSEEvaluator(BaseMethodEvaluator): """ RMSE score evaluator """ _evaluator_method = root_mean_squared_error
[docs]class SMAPEEvaluator(BaseMethodEvaluator): """ SMAPE score evaluator """ _evaluator_method = symmetric_mean_absolute_percentage_error
[docs]class AccuracyEvaluator(BaseMethodEvaluator): """ Accuracy score evaluator """ _evaluator_method = accuracy_score
[docs]class RecallEvaluator(BaseMethodEvaluator): """ Recall score evaluator """ _evaluator_method = recall_score
[docs]class PrecisionEvaluator(BaseMethodEvaluator): """ Precision score evaluator """ _evaluator_method = precision_score
[docs]class F1Evaluator(BaseMethodEvaluator): """ F1 score evaluator """ _evaluator_method = f1_score
[docs]class RocAucEvaluator(BaseMethodEvaluator): """ ROC AUC score evaluator """ _evaluator_method = roc_auc_score