import itertools
from random import choices
from typing import Literal
import numpy as np
import pandas as pd
import ray
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, \
mean_squared_log_error, mean_absolute_percentage_error, median_absolute_error, \
accuracy_score, recall_score, precision_score, f1_score, roc_auc_score
from onetick.ml.interfaces import BaseEvaluator
from onetick.ml.utils import logger, root_mean_squared_error, symmetric_mean_absolute_percentage_error
METRIC_TYPE = Literal['R2', 'MAE', 'RMSE', 'MSLE', 'MAPE', 'SMAPE', 'ACC', 'REC', 'PREC', 'F1', 'ROC_AUC']
# TODO: Rename to MetricsEvaluator
[docs]class BaseMethodEvaluator(BaseEvaluator):
"""Base class for evaluators using simple loss function with interface `(y_test, predict)`.
Override `_evaluator_method` to set loss function.
Attributes
----------
_evaluator_method : function
loss function with interface `(y_test, predict)`
"""
_evaluator_method = None
[docs] def evaluate(self, y_test: pd.DataFrame, predict: pd.DataFrame):
"""Evaluate loss by comparing `y_test` and `predict`.
Parameters
----------
y_test : pandas.DataFrame
Ground truth (correct) target values.
predict : pandas.DataFrame
Estimated target values.
Returns
-------
float
calculated loss
"""
return self._evaluator_method.__func__(y_test, predict)
@property
def name(self):
return self.__class__.__name__.replace("Evaluator", "")
[docs]class OneStepPredictionIntervals(BaseEvaluator):
"""Evaluator for one-step prediction intervals:
https://otexts.com/fpp3/prediction-intervals.html
"""
[docs] def evaluate(self, y, prediction: pd.DataFrame, z_value: float = 1.96):
"""Evaluate one-step prediction interval using the standard deviation of the residuals.
Parameters
----------
y : pandas.DataFrame
Ground truth (correct) target values.
prediction : pandas.DataFrame
Estimated target values.
z_value : float
z-value for confidence interval. Default is 1.96 for 95% confidence interval.
Returns
-------
pandas.DataFrame
calculated one-step prediction interval for each target column
"""
residuals_sq = pd.DataFrame()
for column in y.columns:
residuals_sq[f'{column}_ERROR_SQ'] = (y[column] - prediction[column]) ** 2
n = len(residuals_sq)
var_residuals = residuals_sq.sum()
delta_vals = {}
for column in y.columns:
delta_vals[f'{column}_DELTA'] = z_value * (var_residuals[f'{column}_ERROR_SQ'] / (n - 1)) ** 0.5
return delta_vals
[docs]class BootstrapPredictionIntervals(BaseEvaluator): # pragma: no cover
# NOT WORKING YET
# https://saattrupdan.github.io/2020-03-01-bootstrap-prediction/
def __init__(self):
pass
# TODO in this way calculation will work only with 1 Volume column, it needs to generalize for any amount of column
[docs] def evaluate(self, experiment=None, bucket_size: int = 39, resampling_num: int = 5, alpha: float = 0.05):
"""Calculate confidence interval with feature sampling
Parameters
----------
experiment : Experiment or inherited class
instance of `Experiment` or inherited class.
bucket_size : int
Size of block for bootstrapping.
resampling_num : int
Number of resamples.
alpha : float
The prediction uncertainty.
Returns
-------
tuple of (float, float)
tuple of calculated mean and standard deviation
"""
class _Experiment(experiment.__class__):
pass
if experiment.val_params['val_type'] in ['None', None]:
_Experiment.val_params = {**experiment.val_params, 'val_type': 'Simple'}
# TODO Need to extract cur_model_params from local saved model when we loading model (if possible)
_Experiment.model_params = {k: [v] for k, v in experiment.current_model_params.items()}
# save prediction on test sample before it will be overwritten in the next experiment.predict call
prediction_test_original = experiment.prediction_reverse_processed
logger.debug('prediction_test_original: {prediction_test_original}')
train_preds = experiment.predict(experiment.x_train, preproc_reverse=False)
train_residuals = experiment.y_train['VOLUME'] - train_preds['VOLUME']
print('experiment.y_train', experiment.y_train)
print('train_preds', train_preds)
print('train_residuals', train_residuals)
y_test_unprocessed_original = experiment.y_unprocessed
print('y_test_unprocessed_original', y_test_unprocessed_original)
x_test_original = experiment.x_test
proc_df_original = experiment.proc_df
all_idx = experiment.df.index
test_idx = x_test_original.index
# determine all train sample indexes including cutting one during preprocessing
train_indexes = list(all_idx.difference(test_idx, sort=False))
bucket_num = len(train_indexes) // bucket_size
if bucket_num < 50:
logger.warning('Too small amount of buckets, the calculation may not be representative.')
bucket_indexes = [train_indexes[bucket_size * i:bucket_size * (i + 1)] for i in range(bucket_num)]
bucket_reminder = train_indexes[bucket_size * bucket_num:bucket_size * (bucket_num + 1)]
train_samples_indexes = [list(itertools.chain.from_iterable(choices(bucket_indexes, k=len(bucket_indexes)))) +
bucket_reminder for _ in range(resampling_num)]
val_residuals = []
bootstrap_test_preds = np.zeros([len(x_test_original), resampling_num])
# bootstrap_test_preds = np.empty(resampling_num)
for i, train_sample_indexes in enumerate(train_samples_indexes):
# TODO it needs to decide in the experiment which indexes we use count or Time,
# in the current implementation I should to use reset_index below to correct working of intraday_averaging
# Update! I've changed intraday_averaging implementation and did not test prediction intervals after that!
df = experiment.df.loc[train_sample_indexes].copy(deep=True)
df = df.reset_index(drop=True)
new_exp = _Experiment()
# TODO this todo is already written in the experiment,
# add prepare_data parameter for do not splitting train-test
x_train1, x_train2, y_train1, y_train2 = new_exp.prepare_data(df=df)
x_train = pd.concat([x_train1, x_train2])
y_train = pd.concat([y_train1, y_train2])
new_exp.init_fit(x_train=x_train, y_train=y_train)
bootstrap_val_pred = new_exp.predict(x=new_exp.x_val, preproc_reverse=False)
val_residuals.append(new_exp.y_val['VOLUME'] - bootstrap_val_pred['VOLUME'])
print('val_residual', val_residuals[-1])
# TODO do we need to do reverse processing here?
bootstrap_test_pred = new_exp.predict(x=x_test_original, proc_df=proc_df_original)
# bootstrap_test_pred = new_exp.predict(x=x_test_original, preproc_reverse=False)
print('bootstrap_test_pred', bootstrap_test_pred)
bootstrap_test_preds[:, i] = np.ravel(bootstrap_test_pred['VOLUME'])
# bootstrap_test_preds[i] = bootstrap_test_pred['VOLUME']
print('bootstrap_test_preds', bootstrap_test_preds)
ray.shutdown()
bootstrap_test_preds -= np.atleast_2d(np.mean(bootstrap_test_preds, axis=1)).T
# bootstrap_test_preds -= np.mean(bootstrap_test_preds)
val_residuals = np.concatenate(val_residuals)
print('val_residuals', val_residuals)
print('bootstrap_test_preds', bootstrap_test_preds)
val_residuals = np.percentile(val_residuals, q=np.arange(100))
print('val_residuals', val_residuals)
train_residuals = np.percentile(train_residuals, q=np.arange(100))
print('train_residuals', train_residuals)
# TODO: Do permutation each time for each new test data point?
# TODO: Should we use BLOCK permutation here?
no_information_error = np.mean(np.abs(np.random.permutation(experiment.y_train['VOLUME']) -
np.random.permutation(train_preds['VOLUME'])
)
)
print('no_information_error', no_information_error)
generalisation = np.abs(val_residuals.mean() - train_residuals.mean())
print('generalisation', generalisation)
no_information_val = np.abs(no_information_error - train_residuals)
print('no_information_val', no_information_val)
relative_overfitting_rate = np.mean(generalisation / no_information_val)
print('relative_overfitting_rate', relative_overfitting_rate)
weight = .632 / (1 - .368 * relative_overfitting_rate)
print('weight', weight)
residuals = (1 - weight) * train_residuals + weight * val_residuals
print('residuals', residuals)
C = []
for bootstrap_test_pred in bootstrap_test_preds:
C.append(np.array([m + o for m in bootstrap_test_pred for o in residuals]))
print(len(C))
print(len(C[-1]))
qs = [100 * alpha / 2, 100 * (1 - alpha / 2)]
print('qs', qs)
percentiles = []
for c in C:
percentiles.append(np.percentile(c, q=qs))
print(len(percentiles))
print('percentiles', percentiles)
percentiles_df = pd.DataFrame(percentiles,
columns=['LOWER_OFFSET', 'UPPER_OFFSET'],
index=prediction_test_original.index
).join([prediction_test_original, y_test_unprocessed_original])
print('percentiles_df', percentiles_df)
percentiles_df['VOLUME_LOWER_BORDER'] = percentiles_df['VOLUME'] + percentiles_df['LOWER_OFFSET']
percentiles_df['VOLUME_UPPER_BORDER'] = percentiles_df['VOLUME'] + percentiles_df['UPPER_OFFSET']
print('result_df', percentiles_df)
return percentiles_df
[docs]class R2Evaluator(BaseMethodEvaluator):
""" R2 score evaluator """
_evaluator_method = r2_score
[docs]class MAEEvaluator(BaseMethodEvaluator):
""" MAE score evaluator """
_evaluator_method = mean_absolute_error
[docs]class MSEEvaluator(BaseMethodEvaluator):
""" MSE score evaluator """
_evaluator_method = mean_squared_error
[docs]class MSLEEvaluator(BaseMethodEvaluator):
""" MSLE score evaluator """
_evaluator_method = mean_squared_log_error
[docs]class MAPEEvaluator(BaseMethodEvaluator):
""" MAPE score evaluator. """
_evaluator_method = mean_absolute_percentage_error
[docs]class MdAEEvaluator(BaseMethodEvaluator):
""" MdAE score evaluator """
_evaluator_method = median_absolute_error
[docs]class RMSEEvaluator(BaseMethodEvaluator):
""" RMSE score evaluator """
_evaluator_method = root_mean_squared_error
[docs]class SMAPEEvaluator(BaseMethodEvaluator):
""" SMAPE score evaluator """
_evaluator_method = symmetric_mean_absolute_percentage_error
[docs]class AccuracyEvaluator(BaseMethodEvaluator):
""" Accuracy score evaluator """
_evaluator_method = accuracy_score
[docs]class RecallEvaluator(BaseMethodEvaluator):
""" Recall score evaluator """
_evaluator_method = recall_score
[docs]class PrecisionEvaluator(BaseMethodEvaluator):
""" Precision score evaluator """
_evaluator_method = precision_score
[docs]class F1Evaluator(BaseMethodEvaluator):
""" F1 score evaluator """
_evaluator_method = f1_score
[docs]class RocAucEvaluator(BaseMethodEvaluator):
""" ROC AUC score evaluator """
_evaluator_method = roc_auc_score