Source code for onetick.ml.impl.models.regressor_models

import copy
from abc import ABCMeta
import functools

import catboost
import lightgbm
import mlflow
import numpy as np
import pandas as pd
import sklearn.ensemble
import sklearn.tree
import xgboost
from joblib import dump, load
from lightgbm import early_stopping, log_evaluation
from mlflow.models.signature import infer_signature
from onetick.ml.impl.experiments.experiment import DEFAULT_VERBOSE

from onetick.py.docs.utils import param_doc, docstring
from onetick.ml.interfaces import BaseModel
from onetick.ml.utils import CAT_MSLE, XGB_MAPE, logger

_init_params_doc = param_doc(
    name='init_params',
    desc="""
    Parameters passed directly to the initialization of the native model.
    """,
    annotation=dict
)
_fit_params_doc = param_doc(
    name='fit_params',
    desc="""
    Parameters passed directly to native model `.fit()` method.
    """,
    annotation=dict
)
_dsf_params_doc = param_doc(
    name='dsf_params',
    desc="""
    Dictionary which includes: `overfitting_params` and `loss` param (see `experiment.init_fit()`)
    """,
    annotation=dict
)


[docs]class RegressorModel(BaseModel, metaclass=ABCMeta): """ Abstract base class for features-targets models. """ mlflow_register = None model_class = None loss_map = None metric_map = None @docstring(parameters=[_init_params_doc, _fit_params_doc], add_self=True) def __init__(self, init_params=None, fit_params=None): self.init_params = init_params or {} self.fit_params = fit_params or {} self._model_params = None self._fit_params = None self._eval_set = None self._dsf_params = {} self.is_val_set_empty = None self.model = None self.sklearn_searcher = None super().__init__(init_params=self.init_params, fit_params=self.fit_params, )
[docs] @docstring(parameters=[_dsf_params_doc, _init_params_doc], add_self=True) def init_model(self, dsf_params=None, init_params=None): """ Initialize model with parameters. """ if init_params is None: init_params = self.init_params dsf_params = dsf_params or {} self._dsf_params = dsf_params.copy() self._model_params = init_params.copy() self.model = None self.sklearn_searcher = None
@property def dsf_verbose(self): return self._dsf_params.get('verbose', DEFAULT_VERBOSE)
[docs] def get_model_params(self): """ Override this method to update _model_params before returning it. """ return self._model_params
[docs] def get_fit_params(self): """ Override this method to update _model_params before returning it. """ fit_params = self.fit_params.copy() if self._eval_set: fit_params['eval_set'] = self._eval_set return fit_params
def _set_verbose(self): self._model_params['verbose'] = self._get_verbose_value() def _get_verbose_value(self, model_verbose=None): if model_verbose is None: model_verbose = self._model_params.get('verbose', None) if model_verbose is not None: if isinstance(model_verbose, int): model_verbose = [model_verbose] if self.dsf_verbose != model_verbose: logger.warning('Verbose parameter is specified at the model level, ' 'so it overrides the verbose parameter at the framework level.') return model_verbose else: return [self.dsf_verbose] def _fit(self, x_train, y_train, **kwargs): if self.sklearn_searcher: return self.sklearn_searcher.fit(x_train, y_train, **kwargs) else: return self.model.fit(x_train, y_train, **kwargs)
[docs] def fit(self, x_train, y_train, eval_set=None): """ Train model with X-Y examples (X - features, Y - targets). Parameters ---------- x_train : pandas.DataFrame, numpy.Array, or any model compatible type Data with features for model training. y_train : pandas.DataFrame, numpy.Array, or any model compatible type Data with targets for model training. Must be same length as `x_train`. eval_set : list of (X, y) tuple pairs, optional List of (X, y) tuple pairs to use as validation sets for early-stopping. Returns ------- model_result : Any (depended on which model class used). Only for models with fit() returning trained model (not inplace training). Depended on which model class used. """ if self.model is None: raise AttributeError("Cannot run .fit() without .model attribute set to any native model class.") self._eval_set = eval_set return self._fit(x_train, y_train, **self.get_fit_params())
[docs] def predict(self, x_test: pd.DataFrame, **kwargs): """ Predict Y by X using already trained model (X - features, Y - targets). Parameters ---------- x_test : DataFrame Data with features used to predict Y values. Returns ------- y_pred : numpy.Array Predicted Y values. """ prediction = self.model.predict(x_test) return prediction
[docs] def save_model(self, *args, **kwargs): """ Saving of a model to a local file. Parameters ---------- args : list Arguments goes directly to native `model.save_model()` function. kwargs : dict Keyword arguments goes directly to native `model.save_model()` function. """ self.model.save_model(*args, **kwargs)
[docs] def load_model(self, *args, experiment=None, **kwargs): """ Loading a model from a local file. Parameters ---------- args : list Arguments goes directly to native `model.load_model()` function. kwargs : dict Keyword arguments goes directly to native `model.load_model()` function. Returns ------- Any Loaded ML-model (depended on which model class used) """ self.model.load_model(*args, **kwargs) return self.model
def _choose_model_loss(self, dsf_loss, init_loss): if init_loss is not None: if isinstance(init_loss, list): if len(init_loss) > 1: logger.warning(f'At the moment, it is forbidden to use the loss function as an iteration parameter ' f'in a grid search, the first value of the list will be used: {init_loss[0]}') init_loss = init_loss[0] logger.warning(f'Loss function {init_loss} is specified at the model level, ' f'so it overrides the loss parameter {dsf_loss} at the framework level.') return init_loss return dsf_loss def infer_signature(self, x_test, y_test): return infer_signature(x_test, y_test)
class SklearnRegressorModel(RegressorModel): """ Base class for models with SKLearn interface """ mlflow_register = mlflow.sklearn loss_map = {'MAE': 'absolute_error', 'RMSE': 'squared_error', 'MSLE': 'squared_error', 'MAPE': 'squared_error'} @docstring(parameters=[_dsf_params_doc, _init_params_doc]) def init_model(self, dsf_params={}, init_params={}): """ Initialize sklearn model. """ assert self.model_class is not None, 'SKLearn native model class not specified in .model_class attr!' super().init_model(dsf_params, init_params) self.model = self.model_class(**self.get_model_params()) def get_model_params(self): self._model_params = super().get_model_params() self._set_loss() return self._model_params def _set_verbose(self): # SKLearn gets verbose parameter from fit() function call, not init pass def _set_loss(self, ): if self._dsf_params.get('loss', 'RMSE') not in ['MAE', 'RMSE']: logger.warning(f'Currently, {self.__class__.__name__} model supports only 2 types of loss functions: ' 'MAE and RMSE. Default value: RMSE.') dsf_loss = self.loss_map.get(self._dsf_params.get('loss', 'RMSE'), 'squared_error') init_loss = self._model_params.pop('criterion', None) self._model_params['criterion'] = self._choose_model_loss(dsf_loss, init_loss) def fit(self, x_train, y_train, eval_set=None): """ Train sklearn model with X-Y examples (X - features, Y - targets). Parameters ---------- x_train : pandas.DataFrame, numpy.Array, or any model compatible type Data with features for model training. y_train: pandas.DataFrame, numpy.Array, or any model compatible type Data with targets for model training. Must be same length as `x_train`. Returns ------- model_result : Any (depended on which model class used). Only for models with fit() returning trained model (not inplace training). Depended on which model class used. """ if self.model is None: raise Exception("Running .fit() without model being initialized") x_train_val = x_train y_train_val = y_train if eval_set: if eval_set[0][0] is not None and eval_set[0][1] is not None: x_train_val = pd.concat([x_train, eval_set[0][0]]) y_train_val = pd.concat([y_train, eval_set[0][1]]) return super().fit(x_train_val, np.ravel(y_train_val.values)) def save_model(self, *args, **kwargs): """ Saving of a sklearn model to a local file. Parameters ---------- args : list Arguments goes directly to `joblib.dump()` function. kwargs : dict Keywords arguments goes directly to `joblib.dump()` function. """ dump(self.model, *args, **kwargs) def load_model(self, *args, experiment=None, **kwargs): """ Loading a sklearn model from a local file. Parameters ---------- args : list Arguments goes directly to `joblib.load()` function. kwargs : dict Keywords arguments goes directly to `joblib.load()` function. Returns ------- model : Any (depended on which model class used) Loaded sklearn ML-model. """ model = load(*args, **kwargs) return model class BaseKerasRegressor(RegressorModel): """ Deep Neural Network (multilayer perceptron) model. """ mlflow_register = mlflow.tensorflow get_model_func = None def init_model(self, dsf_params=None, init_params=None): super().init_model(dsf_params=dsf_params, init_params=init_params) from scikeras.wrappers import KerasRegressor from tensorflow.keras.callbacks import EarlyStopping callbacks = [] if not self.is_val_set_empty: if self._dsf_params.get('early_stopping_rounds', 0): early_stopping_ = EarlyStopping(monitor='val_loss', patience=self._dsf_params['early_stopping_rounds'], restore_best_weights=True) callbacks = [early_stopping_] self.model = KerasRegressor( functools.partial(self.get_model_func), **self.get_model_params(), callbacks=callbacks, ) def get_model_params(self): model_params = super().get_model_params() model_params['verbose'] = self._get_verbose_value(model_params.get('verbose'))[0] model_params['loss'] = self._get_model_loss(model_params.get('loss')) return model_params def _get_model_loss(self, model_loss): dsf_loss = self._dsf_params.get('loss', 'MSE') if dsf_loss == 'RMSE': dsf_loss = 'MSE' return self._choose_model_loss(dsf_loss, model_loss) def get_fit_params(self): fit_params = super().get_fit_params() fit_params['epochs'] = fit_params.get('epochs', 16) fit_params['shuffle'] = fit_params.get('shuffle', False) if 'eval_set' in fit_params: fit_params['validation_data'] = fit_params.pop('eval_set')[0] return fit_params def save_model(self, *args, **kwargs): """ Saving of a DNN-model to a local file. Parameters ---------- args : list Arguments goes directly to `joblib.dump()` function. kwargs : dict Keyword arguments goes directly to `joblib.dump()` function. """ self.model.model_.save(*args, **kwargs) def load_model(self, *args, experiment=None, **kwargs): """ Loading a DNN-model from a local file. Parameters ---------- args : list Arguments goes directly to native `model.load_model()` function. kwargs : dict Keyword arguments goes directly to `model.load_model()` function. Returns ------- model : KerasRegressor Loaded DNN-model. """ from scikeras.wrappers import KerasRegressor from tensorflow.keras.models import load_model model = load_model(*args, **kwargs) sci_model = KerasRegressor(model) sci_model.initialize(experiment.x_processed, experiment.y_processed) return sci_model def get_mlflow_model(self): return self.model.model_ def infer_signature(self, x_test, y_test): return infer_signature(x_test.to_numpy(), y_test.to_numpy())
[docs]class DNNRegressor(BaseKerasRegressor): """ Common Deep Neural Network model (Keras based) Parameters ---------- init_params: dict Dictionary with parameters for model initialization and customization. It includes: hid_layers_num: int Number of hidden layers. neurons_num_layerN: int Number of neurons in layer N (N - integer >= 1). dropout_layerN: float Dropout of layer N (N - integer >= 1). Value >= 0 and < 1. activation_layerN: Any Activation function of layer N (N - integer >= 1). optimizer: Any Optimizer used in network training. fit_params: dict Dictionary with parameters for model.fit() function. """ # TODO: Pass kwargs into model.compile() def get_model_func(self, meta, compile_kwargs, **kwargs): from tensorflow.keras.layers import Dense, Dropout, Input from tensorflow.keras.models import Sequential model = Sequential() model.add(Input(shape=(meta['n_features_in_']))) for layer in range(1, kwargs.get('hid_layers_num', 2) + 1): model.add(Dense(kwargs.get(f'neurons_num_layer{layer}', 4), activation=kwargs.get(f'activation_layer{layer}', 'relu'))) if kwargs.get(f'dropout_layer{layer}', 0) > 0: model.add(Dropout(kwargs[f'dropout_layer{layer}'])) model.add(Dense(1)) model.compile(loss=compile_kwargs["loss"], optimizer=compile_kwargs["optimizer"]) return model
[docs]class XGBRegressor(RegressorModel): """ XGBoost regressor model. """ mlflow_register = mlflow.xgboost model_class = xgboost.XGBRegressor # TODO Find MAE loss in the catboost lib and use it for MAE loss implementation instead of huber loss_map = {'MAE': 'reg:pseudohubererror', 'RMSE': 'reg:squarederror', 'MSLE': 'reg:squaredlogerror', 'MAPE': XGB_MAPE} metric_map = {'R2': 'r2', 'MAE': 'mae', 'RMSE': 'rmse', 'MSLE': 'rmsle', 'MAPE': 'mape'}
[docs] def init_model(self, dsf_params={}, init_params={}): """ Init XGBRegressor model. Parameters ---------- dsf_params : dict Dictionary which includes: `overfitting_params` and `loss` param (see `experiment.init_fit()`) init_params : dict Parameters passed directly to the initialization of the native XGBRegressor model. """ super().init_model(dsf_params=dsf_params, init_params=init_params) self._set_model_loss() self._set_early_stopping() self.model = self.model_class(**self.get_model_params())
def _set_early_stopping(self): dsf_loss = self._dsf_params.get('eval_metric', 'MAE') eval_metric = self.metric_map.get(dsf_loss) early_stopping_rounds = self._dsf_params.get('early_stopping_rounds', None) if self.is_val_set_empty: eval_metric = None early_stopping_rounds = None self._model_params['eval_metric'] = eval_metric self._model_params['early_stopping_rounds'] = early_stopping_rounds def _set_model_loss(self, ): dsf_loss = self.loss_map.get(self._dsf_params.get('loss', 'RMSE'), 'reg:squarederror') model_loss = self._model_params.pop('objective', None) self._model_params['objective'] = self._choose_model_loss(dsf_loss, model_loss)
[docs] def get_fit_params(self): fit_params = super().get_fit_params() fit_params['verbose'] = bool(self._get_verbose_value(fit_params.get('verbose'))[0]) return fit_params
[docs]class CatBoostRegressor(RegressorModel): """ CatBoostRegressor model. """ mlflow_register = mlflow.catboost model_class = catboost.CatBoostRegressor
[docs] def init_model(self, dsf_params=None, init_params=None): """ Init CatBoostRegressor model. Parameters ---------- dsf_params : dict Dictionary which includes: `overfitting_params` and `loss` param (see `experiment.init_fit()`) init_params : dict Parameters passed directly to the initialization of the native CatBoostRegressor model. """ super().init_model(dsf_params, init_params) self.model = self.model_class(**self.get_model_params())
[docs] def get_model_params(self): self._set_model_loss() self._set_early_stopping() self._set_verbose() return self._model_params
def _set_early_stopping(self): eval_metric = self._dsf_params.get('eval_metric', 'MAE') early_stopping_rounds = self._dsf_params.get('early_stopping_rounds', None) if self.is_val_set_empty: eval_metric = None early_stopping_rounds = None self._model_params['eval_metric'] = eval_metric self._model_params['early_stopping_rounds'] = early_stopping_rounds def _set_model_loss(self): dsf_loss = self._dsf_params.get('loss', 'RMSE') if dsf_loss == 'MSLE': dsf_loss = CAT_MSLE() model_loss = self._model_params.pop('loss_function', None) self._model_params['loss_function'] = self._choose_model_loss(dsf_loss, model_loss) def _set_verbose(self): model_verbose = self._model_params.get('verbose', None) if model_verbose is not None: if self.dsf_verbose != model_verbose: logger.warning('Verbose parameter is specified at the model level, ' 'so it overrides the verbose parameter at the framework level.') self._model_params['verbose'] = model_verbose else: self._model_params['verbose'] = self.dsf_verbose
[docs]class LGBMRegressor(RegressorModel): """ LightGBM regressor model. """ mlflow_register = mlflow.lightgbm model_class = lightgbm.LGBMRegressor # todo regression == l2 loss ~ RMSE, implement MAE, MSLE, MAPE metrics loss_map = {'MAE': 'regression', 'RMSE': 'regression', 'MSLE': 'regression', 'MAPE': 'regression'} # todo implement r2 and MSLE metrics metric_map = {'R2': 'mae', 'MAE': 'mae', 'RMSE': 'rmse', 'MSLE': 'mae', 'MAPE': 'mape'}
[docs] def init_model(self, dsf_params=None, init_params=None): super().init_model(dsf_params, init_params) self.model = self.model_class(**self.get_model_params())
[docs] def get_model_params(self): init_params = super().get_model_params() # define loss function dsf_loss = self._dsf_params.get('loss', 'RMSE') if dsf_loss not in ['RMSE']: logger.warning(f'Loss function {dsf_loss} is not implemented yet in the current model, RMSE will be used.') loss = self.loss_map.get(dsf_loss, 'regression') self._model_params['objective'] = self._choose_model_loss(loss, init_params.pop('objective', None)) # define verbose verbose = self._dsf_params.get('verbose', DEFAULT_VERBOSE) model_verbose = init_params.pop('verbose', None) if model_verbose is not None: logger.warning('Verbose parameter is specified at the model level, so it overrides the verbose ' 'parameter at the framework level.') verbose = model_verbose[0] # remeber early params for get_fit_params() self.eval_metric = self.metric_map.get(self._dsf_params.get('eval_metric', 'MAE')) self.early_stopping_rounds = self._dsf_params.get('early_stopping_rounds', None) self.verbose = verbose return init_params
[docs] def get_fit_params(self): fit_params = super().get_fit_params() if fit_params is None: fit_params = {} verbose_ = log_evaluation(period=self.verbose) callbacks = [verbose_] if not self.is_val_set_empty: if self.early_stopping_rounds: early_stopping_ = early_stopping(stopping_rounds=self.early_stopping_rounds, verbose=self.verbose) callbacks = [early_stopping_, verbose_] fit_params['callbacks'] = callbacks fit_params['eval_metric'] = self.eval_metric return fit_params
[docs] def save_model(self, *args, **kwargs): """ Saving of LGBMRegressor model to a local file. Parameters ---------- args : list Arguments goes directly to native `model.booster_.save_model()` function. kwargs : dict Keyword arguments goes directly to native `model.booster_.save_model()` function. """ self.model.booster_.save_model(*args, **kwargs)
[docs] def load_model(self, path, experiment=None, **kwargs): """ Loading a model from a local file. Parameters ---------- path: str, pathlib.Path Path to the model file. kwargs: dict Keyword arguments goes directly to native `lightgbm.Booster`. Returns ------- model : Any (depended on which model class used) Loaded LGBMRegressor model. """ self.model = lightgbm.Booster(model_file=path, **kwargs) return self.model
# TODO Update CascadeForestRegressor model
[docs]class CascadeForestRegressor(RegressorModel): # pragma: no cover # TODO _joblib_parallel_args removed in latest scikit-learn dev build need to fix sklearn version or update # deepforest lib. https://github.com/scikit-learn-contrib/imbalanced-learn/issues/894
[docs] def init_model(self, dsf_params={}, init_params={}): super().init_model(dsf_params=dsf_params, init_params=init_params) from deepforest import CascadeForestRegressor self.model = CascadeForestRegressor(**self.get_init_params())
[docs] def fit(self, x_train, y_train, eval_set=None): return super().fit(x_train.values, np.ravel(y_train.values), eval_set=eval_set)
[docs]class DecisionTreeRegressor(SklearnRegressorModel): """ DecisionTreeRegressor sklearn model. """ model_class = sklearn.tree.DecisionTreeRegressor
[docs]class RandomForestRegressor(SklearnRegressorModel): """ RandomForestRegressor sklearn model. """ model_class = sklearn.ensemble.RandomForestRegressor
class DummyModel: """ Dummy model class used for tests to reduce running time. Don't fit or train, just implements methods to call. """ def fit(self, x_train, y_train, **kwargs): self._real_fit_params = kwargs.copy() return self def predict(self, x): return np.zeros(len(x))
[docs]class DummyRegressor(RegressorModel): """ Dummy regressor class used for tests. Don't fit or train, just implements methods to call. """
[docs] def init_model(self, dsf_params={}, init_params={}): super().init_model(dsf_params=dsf_params, init_params=init_params) self.get_model_params() self.model = DummyModel()
[docs] def fit(self, x_train, y_train, eval_set=None): result_model = self._fit(x_train, y_train, **self.get_fit_params()) return result_model