import copy
from abc import ABCMeta
import functools
import catboost
import lightgbm
import mlflow
import numpy as np
import pandas as pd
import sklearn.ensemble
import sklearn.tree
import xgboost
from joblib import dump, load
from lightgbm import early_stopping, log_evaluation
from mlflow.models.signature import infer_signature
from onetick.ml.impl.experiments.experiment import DEFAULT_VERBOSE
from onetick.py.docs.utils import param_doc, docstring
from onetick.ml.interfaces import BaseModel
from onetick.ml.utils import CAT_MSLE, XGB_MAPE, logger
_init_params_doc = param_doc(
name='init_params',
desc="""
Parameters passed directly to the initialization of the native model.
""",
annotation=dict
)
_fit_params_doc = param_doc(
name='fit_params',
desc="""
Parameters passed directly to native model `.fit()` method.
""",
annotation=dict
)
_dsf_params_doc = param_doc(
name='dsf_params',
desc="""
Dictionary which includes: `overfitting_params` and `loss` param (see `experiment.init_fit()`)
""",
annotation=dict
)
[docs]class RegressorModel(BaseModel, metaclass=ABCMeta):
"""
Abstract base class for features-targets models.
"""
mlflow_register = None
model_class = None
loss_map = None
metric_map = None
@docstring(parameters=[_init_params_doc, _fit_params_doc], add_self=True)
def __init__(self, init_params=None, fit_params=None):
self.init_params = init_params or {}
self.fit_params = fit_params or {}
self._model_params = None
self._fit_params = None
self._eval_set = None
self._dsf_params = {}
self.is_val_set_empty = None
self.model = None
self.sklearn_searcher = None
super().__init__(init_params=self.init_params, fit_params=self.fit_params, )
[docs] @docstring(parameters=[_dsf_params_doc, _init_params_doc], add_self=True)
def init_model(self, dsf_params=None, init_params=None):
"""
Initialize model with parameters.
"""
if init_params is None:
init_params = self.init_params
dsf_params = dsf_params or {}
self._dsf_params = dsf_params.copy()
self._model_params = init_params.copy()
self.model = None
self.sklearn_searcher = None
@property
def dsf_verbose(self):
return self._dsf_params.get('verbose', DEFAULT_VERBOSE)
[docs] def get_model_params(self):
"""
Override this method to update _model_params before returning it.
"""
return self._model_params
[docs] def get_fit_params(self):
"""
Override this method to update _model_params before returning it.
"""
fit_params = self.fit_params.copy()
if self._eval_set:
fit_params['eval_set'] = self._eval_set
return fit_params
def _set_verbose(self):
self._model_params['verbose'] = self._get_verbose_value()
def _get_verbose_value(self, model_verbose=None):
if model_verbose is None:
model_verbose = self._model_params.get('verbose', None)
if model_verbose is not None:
if isinstance(model_verbose, int):
model_verbose = [model_verbose]
if self.dsf_verbose != model_verbose:
logger.warning('Verbose parameter is specified at the model level, '
'so it overrides the verbose parameter at the framework level.')
return model_verbose
else:
return [self.dsf_verbose]
def _fit(self, x_train, y_train, **kwargs):
if self.sklearn_searcher:
return self.sklearn_searcher.fit(x_train, y_train, **kwargs)
else:
return self.model.fit(x_train, y_train, **kwargs)
[docs] def fit(self, x_train, y_train, eval_set=None):
"""
Train model with X-Y examples (X - features, Y - targets).
Parameters
----------
x_train : pandas.DataFrame, numpy.Array, or any model compatible type
Data with features for model training.
y_train : pandas.DataFrame, numpy.Array, or any model compatible type
Data with targets for model training. Must be same length as `x_train`.
eval_set : list of (X, y) tuple pairs, optional
List of (X, y) tuple pairs to use as validation sets for early-stopping.
Returns
-------
model_result : Any (depended on which model class used).
Only for models with fit() returning trained model (not inplace training).
Depended on which model class used.
"""
if self.model is None:
raise AttributeError("Cannot run .fit() without .model attribute set to any native model class.")
self._eval_set = eval_set
return self._fit(x_train, y_train, **self.get_fit_params())
[docs] def predict(self, x_test: pd.DataFrame, **kwargs):
"""
Predict Y by X using already trained model (X - features, Y - targets).
Parameters
----------
x_test : DataFrame
Data with features used to predict Y values.
Returns
-------
y_pred : numpy.Array
Predicted Y values.
"""
prediction = self.model.predict(x_test)
return prediction
[docs] def save_model(self, *args, **kwargs):
"""
Saving of a model to a local file.
Parameters
----------
args : list
Arguments goes directly to native `model.save_model()` function.
kwargs : dict
Keyword arguments goes directly to native `model.save_model()` function.
"""
self.model.save_model(*args, **kwargs)
[docs] def load_model(self, *args, experiment=None, **kwargs):
"""
Loading a model from a local file.
Parameters
----------
args : list
Arguments goes directly to native `model.load_model()` function.
kwargs : dict
Keyword arguments goes directly to native `model.load_model()` function.
Returns
-------
Any
Loaded ML-model (depended on which model class used)
"""
self.model.load_model(*args, **kwargs)
return self.model
def _choose_model_loss(self, dsf_loss, init_loss):
if init_loss is not None:
if isinstance(init_loss, list):
if len(init_loss) > 1:
logger.warning(f'At the moment, it is forbidden to use the loss function as an iteration parameter '
f'in a grid search, the first value of the list will be used: {init_loss[0]}')
init_loss = init_loss[0]
logger.warning(f'Loss function {init_loss} is specified at the model level, '
f'so it overrides the loss parameter {dsf_loss} at the framework level.')
return init_loss
return dsf_loss
def infer_signature(self, x_test, y_test):
return infer_signature(x_test, y_test)
class SklearnRegressorModel(RegressorModel):
"""
Base class for models with SKLearn interface
"""
mlflow_register = mlflow.sklearn
loss_map = {'MAE': 'absolute_error',
'RMSE': 'squared_error',
'MSLE': 'squared_error',
'MAPE': 'squared_error'}
@docstring(parameters=[_dsf_params_doc, _init_params_doc])
def init_model(self, dsf_params={}, init_params={}):
"""
Initialize sklearn model.
"""
assert self.model_class is not None, 'SKLearn native model class not specified in .model_class attr!'
super().init_model(dsf_params, init_params)
self.model = self.model_class(**self.get_model_params())
def get_model_params(self):
self._model_params = super().get_model_params()
self._set_loss()
return self._model_params
def _set_verbose(self):
# SKLearn gets verbose parameter from fit() function call, not init
pass
def _set_loss(self, ):
if self._dsf_params.get('loss', 'RMSE') not in ['MAE', 'RMSE']:
logger.warning(f'Currently, {self.__class__.__name__} model supports only 2 types of loss functions: '
'MAE and RMSE. Default value: RMSE.')
dsf_loss = self.loss_map.get(self._dsf_params.get('loss', 'RMSE'), 'squared_error')
init_loss = self._model_params.pop('criterion', None)
self._model_params['criterion'] = self._choose_model_loss(dsf_loss, init_loss)
def fit(self, x_train, y_train, eval_set=None):
"""
Train sklearn model with X-Y examples (X - features, Y - targets).
Parameters
----------
x_train : pandas.DataFrame, numpy.Array, or any model compatible type
Data with features for model training.
y_train: pandas.DataFrame, numpy.Array, or any model compatible type
Data with targets for model training. Must be same length as `x_train`.
Returns
-------
model_result : Any (depended on which model class used).
Only for models with fit() returning trained model (not inplace training).
Depended on which model class used.
"""
if self.model is None:
raise Exception("Running .fit() without model being initialized")
x_train_val = x_train
y_train_val = y_train
if eval_set:
if eval_set[0][0] is not None and eval_set[0][1] is not None:
x_train_val = pd.concat([x_train, eval_set[0][0]])
y_train_val = pd.concat([y_train, eval_set[0][1]])
return super().fit(x_train_val, np.ravel(y_train_val.values))
def save_model(self, *args, **kwargs):
"""
Saving of a sklearn model to a local file.
Parameters
----------
args : list
Arguments goes directly to `joblib.dump()` function.
kwargs : dict
Keywords arguments goes directly to `joblib.dump()` function.
"""
dump(self.model, *args, **kwargs)
def load_model(self, *args, experiment=None, **kwargs):
"""
Loading a sklearn model from a local file.
Parameters
----------
args : list
Arguments goes directly to `joblib.load()` function.
kwargs : dict
Keywords arguments goes directly to `joblib.load()` function.
Returns
-------
model : Any (depended on which model class used)
Loaded sklearn ML-model.
"""
model = load(*args, **kwargs)
return model
class BaseKerasRegressor(RegressorModel):
"""
Deep Neural Network (multilayer perceptron) model.
"""
mlflow_register = mlflow.tensorflow
get_model_func = None
def init_model(self, dsf_params=None, init_params=None):
super().init_model(dsf_params=dsf_params, init_params=init_params)
from scikeras.wrappers import KerasRegressor
from tensorflow.keras.callbacks import EarlyStopping
callbacks = []
if not self.is_val_set_empty:
if self._dsf_params.get('early_stopping_rounds', 0):
early_stopping_ = EarlyStopping(monitor='val_loss',
patience=self._dsf_params['early_stopping_rounds'],
restore_best_weights=True)
callbacks = [early_stopping_]
self.model = KerasRegressor(
functools.partial(self.get_model_func),
**self.get_model_params(),
callbacks=callbacks,
)
def get_model_params(self):
model_params = super().get_model_params()
model_params['verbose'] = self._get_verbose_value(model_params.get('verbose'))[0]
model_params['loss'] = self._get_model_loss(model_params.get('loss'))
return model_params
def _get_model_loss(self, model_loss):
dsf_loss = self._dsf_params.get('loss', 'MSE')
if dsf_loss == 'RMSE':
dsf_loss = 'MSE'
return self._choose_model_loss(dsf_loss, model_loss)
def get_fit_params(self):
fit_params = super().get_fit_params()
fit_params['epochs'] = fit_params.get('epochs', 16)
fit_params['shuffle'] = fit_params.get('shuffle', False)
if 'eval_set' in fit_params:
fit_params['validation_data'] = fit_params.pop('eval_set')[0]
return fit_params
def save_model(self, *args, **kwargs):
"""
Saving of a DNN-model to a local file.
Parameters
----------
args : list
Arguments goes directly to `joblib.dump()` function.
kwargs : dict
Keyword arguments goes directly to `joblib.dump()` function.
"""
self.model.model_.save(*args, **kwargs)
def load_model(self, *args, experiment=None, **kwargs):
"""
Loading a DNN-model from a local file.
Parameters
----------
args : list
Arguments goes directly to native `model.load_model()` function.
kwargs : dict
Keyword arguments goes directly to `model.load_model()` function.
Returns
-------
model : KerasRegressor
Loaded DNN-model.
"""
from scikeras.wrappers import KerasRegressor
from tensorflow.keras.models import load_model
model = load_model(*args, **kwargs)
sci_model = KerasRegressor(model)
sci_model.initialize(experiment.x_processed, experiment.y_processed)
return sci_model
def get_mlflow_model(self):
return self.model.model_
def infer_signature(self, x_test, y_test):
return infer_signature(x_test.to_numpy(), y_test.to_numpy())
[docs]class DNNRegressor(BaseKerasRegressor):
"""
Common Deep Neural Network model (Keras based)
Parameters
----------
init_params: dict
Dictionary with parameters for model initialization and customization. It includes:
hid_layers_num: int
Number of hidden layers.
neurons_num_layerN: int
Number of neurons in layer N (N - integer >= 1).
dropout_layerN: float
Dropout of layer N (N - integer >= 1). Value >= 0 and < 1.
activation_layerN: Any
Activation function of layer N (N - integer >= 1).
optimizer: Any
Optimizer used in network training.
fit_params: dict
Dictionary with parameters for model.fit() function.
"""
# TODO: Pass kwargs into model.compile()
def get_model_func(self, meta, compile_kwargs, **kwargs):
from tensorflow.keras.layers import Dense, Dropout, Input
from tensorflow.keras.models import Sequential
model = Sequential()
model.add(Input(shape=(meta['n_features_in_'])))
for layer in range(1, kwargs.get('hid_layers_num', 2) + 1):
model.add(Dense(kwargs.get(f'neurons_num_layer{layer}', 4),
activation=kwargs.get(f'activation_layer{layer}', 'relu')))
if kwargs.get(f'dropout_layer{layer}', 0) > 0:
model.add(Dropout(kwargs[f'dropout_layer{layer}']))
model.add(Dense(1))
model.compile(loss=compile_kwargs["loss"],
optimizer=compile_kwargs["optimizer"])
return model
[docs]class XGBRegressor(RegressorModel):
"""
XGBoost regressor model.
"""
mlflow_register = mlflow.xgboost
model_class = xgboost.XGBRegressor
# TODO Find MAE loss in the catboost lib and use it for MAE loss implementation instead of huber
loss_map = {'MAE': 'reg:pseudohubererror',
'RMSE': 'reg:squarederror',
'MSLE': 'reg:squaredlogerror',
'MAPE': XGB_MAPE}
metric_map = {'R2': 'r2',
'MAE': 'mae',
'RMSE': 'rmse',
'MSLE': 'rmsle',
'MAPE': 'mape'}
[docs] def init_model(self, dsf_params={}, init_params={}):
"""
Init XGBRegressor model.
Parameters
----------
dsf_params : dict
Dictionary which includes: `overfitting_params` and `loss` param (see `experiment.init_fit()`)
init_params : dict
Parameters passed directly to the initialization of the native XGBRegressor model.
"""
super().init_model(dsf_params=dsf_params, init_params=init_params)
self._set_model_loss()
self._set_early_stopping()
self.model = self.model_class(**self.get_model_params())
def _set_early_stopping(self):
dsf_loss = self._dsf_params.get('eval_metric', 'MAE')
eval_metric = self.metric_map.get(dsf_loss)
early_stopping_rounds = self._dsf_params.get('early_stopping_rounds', None)
if self.is_val_set_empty:
eval_metric = None
early_stopping_rounds = None
self._model_params['eval_metric'] = eval_metric
self._model_params['early_stopping_rounds'] = early_stopping_rounds
def _set_model_loss(self, ):
dsf_loss = self.loss_map.get(self._dsf_params.get('loss', 'RMSE'), 'reg:squarederror')
model_loss = self._model_params.pop('objective', None)
self._model_params['objective'] = self._choose_model_loss(dsf_loss, model_loss)
[docs] def get_fit_params(self):
fit_params = super().get_fit_params()
fit_params['verbose'] = bool(self._get_verbose_value(fit_params.get('verbose'))[0])
return fit_params
[docs]class CatBoostRegressor(RegressorModel):
"""
CatBoostRegressor model.
"""
mlflow_register = mlflow.catboost
model_class = catboost.CatBoostRegressor
[docs] def init_model(self, dsf_params=None, init_params=None):
"""
Init CatBoostRegressor model.
Parameters
----------
dsf_params : dict
Dictionary which includes: `overfitting_params` and `loss` param (see `experiment.init_fit()`)
init_params : dict
Parameters passed directly to the initialization of the native CatBoostRegressor model.
"""
super().init_model(dsf_params, init_params)
self.model = self.model_class(**self.get_model_params())
[docs] def get_model_params(self):
self._set_model_loss()
self._set_early_stopping()
self._set_verbose()
return self._model_params
def _set_early_stopping(self):
eval_metric = self._dsf_params.get('eval_metric', 'MAE')
early_stopping_rounds = self._dsf_params.get('early_stopping_rounds', None)
if self.is_val_set_empty:
eval_metric = None
early_stopping_rounds = None
self._model_params['eval_metric'] = eval_metric
self._model_params['early_stopping_rounds'] = early_stopping_rounds
def _set_model_loss(self):
dsf_loss = self._dsf_params.get('loss', 'RMSE')
if dsf_loss == 'MSLE':
dsf_loss = CAT_MSLE()
model_loss = self._model_params.pop('loss_function', None)
self._model_params['loss_function'] = self._choose_model_loss(dsf_loss, model_loss)
def _set_verbose(self):
model_verbose = self._model_params.get('verbose', None)
if model_verbose is not None:
if self.dsf_verbose != model_verbose:
logger.warning('Verbose parameter is specified at the model level, '
'so it overrides the verbose parameter at the framework level.')
self._model_params['verbose'] = model_verbose
else:
self._model_params['verbose'] = self.dsf_verbose
[docs]class LGBMRegressor(RegressorModel):
"""
LightGBM regressor model.
"""
mlflow_register = mlflow.lightgbm
model_class = lightgbm.LGBMRegressor
# todo regression == l2 loss ~ RMSE, implement MAE, MSLE, MAPE metrics
loss_map = {'MAE': 'regression',
'RMSE': 'regression',
'MSLE': 'regression',
'MAPE': 'regression'}
# todo implement r2 and MSLE metrics
metric_map = {'R2': 'mae',
'MAE': 'mae',
'RMSE': 'rmse',
'MSLE': 'mae',
'MAPE': 'mape'}
[docs] def init_model(self, dsf_params=None, init_params=None):
super().init_model(dsf_params, init_params)
self.model = self.model_class(**self.get_model_params())
[docs] def get_model_params(self):
init_params = super().get_model_params()
# define loss function
dsf_loss = self._dsf_params.get('loss', 'RMSE')
if dsf_loss not in ['RMSE']:
logger.warning(f'Loss function {dsf_loss} is not implemented yet in the current model, RMSE will be used.')
loss = self.loss_map.get(dsf_loss, 'regression')
self._model_params['objective'] = self._choose_model_loss(loss, init_params.pop('objective', None))
# define verbose
verbose = self._dsf_params.get('verbose', DEFAULT_VERBOSE)
model_verbose = init_params.pop('verbose', None)
if model_verbose is not None:
logger.warning('Verbose parameter is specified at the model level, so it overrides the verbose '
'parameter at the framework level.')
verbose = model_verbose[0]
# remeber early params for get_fit_params()
self.eval_metric = self.metric_map.get(self._dsf_params.get('eval_metric', 'MAE'))
self.early_stopping_rounds = self._dsf_params.get('early_stopping_rounds', None)
self.verbose = verbose
return init_params
[docs] def get_fit_params(self):
fit_params = super().get_fit_params()
if fit_params is None:
fit_params = {}
verbose_ = log_evaluation(period=self.verbose)
callbacks = [verbose_]
if not self.is_val_set_empty:
if self.early_stopping_rounds:
early_stopping_ = early_stopping(stopping_rounds=self.early_stopping_rounds, verbose=self.verbose)
callbacks = [early_stopping_, verbose_]
fit_params['callbacks'] = callbacks
fit_params['eval_metric'] = self.eval_metric
return fit_params
[docs] def save_model(self, *args, **kwargs):
"""
Saving of LGBMRegressor model to a local file.
Parameters
----------
args : list
Arguments goes directly to native `model.booster_.save_model()` function.
kwargs : dict
Keyword arguments goes directly to native `model.booster_.save_model()` function.
"""
self.model.booster_.save_model(*args, **kwargs)
[docs] def load_model(self, path, experiment=None, **kwargs):
"""
Loading a model from a local file.
Parameters
----------
path: str, pathlib.Path
Path to the model file.
kwargs: dict
Keyword arguments goes directly to native `lightgbm.Booster`.
Returns
-------
model : Any (depended on which model class used)
Loaded LGBMRegressor model.
"""
self.model = lightgbm.Booster(model_file=path, **kwargs)
return self.model
# TODO Update CascadeForestRegressor model
[docs]class CascadeForestRegressor(RegressorModel): # pragma: no cover
# TODO _joblib_parallel_args removed in latest scikit-learn dev build need to fix sklearn version or update
# deepforest lib. https://github.com/scikit-learn-contrib/imbalanced-learn/issues/894
[docs] def init_model(self, dsf_params={}, init_params={}):
super().init_model(dsf_params=dsf_params, init_params=init_params)
from deepforest import CascadeForestRegressor
self.model = CascadeForestRegressor(**self.get_init_params())
[docs] def fit(self, x_train, y_train, eval_set=None):
return super().fit(x_train.values, np.ravel(y_train.values), eval_set=eval_set)
[docs]class DecisionTreeRegressor(SklearnRegressorModel):
"""
DecisionTreeRegressor sklearn model.
"""
model_class = sklearn.tree.DecisionTreeRegressor
[docs]class RandomForestRegressor(SklearnRegressorModel):
"""
RandomForestRegressor sklearn model.
"""
model_class = sklearn.ensemble.RandomForestRegressor
class DummyModel:
"""
Dummy model class used for tests to reduce running time.
Don't fit or train, just implements methods to call.
"""
def fit(self, x_train, y_train, **kwargs):
self._real_fit_params = kwargs.copy()
return self
def predict(self, x):
return np.zeros(len(x))
[docs]class DummyRegressor(RegressorModel):
"""
Dummy regressor class used for tests. Don't fit or train, just implements methods to call.
"""
[docs] def init_model(self, dsf_params={}, init_params={}):
super().init_model(dsf_params=dsf_params, init_params=init_params)
self.get_model_params()
self.model = DummyModel()
[docs] def fit(self, x_train, y_train, eval_set=None):
result_model = self._fit(x_train, y_train, **self.get_fit_params())
return result_model