Source code for onetick.ml.impl.experiments.experiment

import os
import copy
import ray
import joblib
import ray
import mlflow
import datetime
import pandas as pd
import onetick.py as otp
from functools import reduce
from typing import Optional, Union
from mlflow.tracking import MlflowClient
from ray.util.joblib import register_ray
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV

import onetick.ml
from onetick.ml.impl.data_pipelines.data_pipeline import SelectFeatures, SelectTargets
from onetick.ml.impl.data_pipelines.ot_pipeline import ToPandas
from onetick.ml.utils.schema import DataSchema
from onetick.ml.impl.evaluators import (BootstrapPredictionIntervals,
                                        OneStepPredictionIntervals)
from onetick.ml.interfaces import BaseExperiment
from onetick.ml.interfaces.data_pipelines import BaseFeatures, BaseFilter, BasePreprocess
from onetick.ml.utils import create_folds, logger, params_iterator

DEFAULT_VERBOSE = 0


class PreparePredictDataMixin:
    """
    Mixin for experiments and wrapped model to prepare data for prediction.
    Need to have `pipeline` attribute in class.
    """

    def prepare_predict_data(self, df: pd.DataFrame):
        """
        Prepare data for prediction only:

        - pipeline transformations is applied to data (without target columns and without fit)
        - return prepared data as pd.DataFrame with features only

        Parameters
        ----------
        df : pd.DataFrame
            Plain data to prepare for prediction.
            Format is expected to be the same as comes from datafeed.

        Returns
        -------
        x : pd.DataFrame
            Prepared data, suitable to be passed in `Experiment.predict()` method.
        """

        src = df

        for proc in self.pipeline:
            if isinstance(proc, str):
                self.schema.add_features(proc)
                continue

            proc.schema = self.schema
            if proc.refit_on_predict:
                src = proc.fit(src)
            src = proc.transform(src)

        x = src.loc[:, self.schema.features_columns]
        return x


[docs]class Experiment(BaseExperiment, PreparePredictDataMixin): """ Implements full cycle to run experiment. """ general = dict( experiment_name='default_experiment', log_models=False, mlflow_url=None, ) datafeeds = [] splitters = [] pipeline = [] models = [] evaluators = [] features_columns = [] target_columns = [] group_column_name = None # train params train_params = dict( loss='RMSE', verbose=1, overfitting={}, search_cv={}, n_jobs=-1, ) # prediction and evaluation params prediction_params = {'preprocessing_reverse': True} def __init__(self): # assert self.datafeeds, "datafeed must be defined to run experiment" # assert self.splitters, "splitters must be defined to run experiment" self.df = pd.DataFrame() self.src = None self.pipeline_query = None self.prediction, self.prediction_reverse_processed = None, None self._prediction_input = None self.mlflow_run = None self.results = {} self.dsf_model = None self.gscv_model = None self.native_model = None self.models_cv_score = None self.models_cv_results = None self.current_model_params = None self.model_signature = None self.model_result = None self.preproc_reverse = None self.current_metrics = None self.datafeed_hashes = {} self.restored_datafeed_hashes = None self.general: dict = {} if self.general is None else self.general self.train_params: dict = {} if self.train_params is None else self.train_params self.experiment_name: str = self.general.get('experiment_name', 'default_experiment') self.log_models = self.general.get('log_models', False) self.mlflow_url = self.general.get('mlflow_url', None) self.wrapped_model = None self.schema = DataSchema() self.schema.group_column_name = self.group_column_name if not self.features_columns: raise RuntimeError("No features columns selected. " "List features columns in Experiment.features_columns attribute.") if not self.target_columns: raise RuntimeError("No target columns selected. " "List target columns in Experiment.target_columns attribute.") def _use_mlflow(self) -> bool: """Boolean showing are able to use MLFlow.""" return bool(self.mlflow_url and self.experiment_name)
[docs] def serialize_config(self): """Dump current instance config to `dict`, that could be used to reconstruct class by `build_experiment_class()`. """ config = {} for attr in ['general', 'group_column_name', 'train_params', 'features_columns', 'target_columns']: config[attr] = getattr(self, attr) for attr in ['datafeeds', 'splitters', 'pipeline', "models", "evaluators"]: obj_list = [] for obj in getattr(self, attr): obj_cfg = copy.deepcopy(obj._init_params) obj_list.append(obj_cfg) config[attr] = obj_list config['prediction'] = { 'preprocessing_reverse': self.prediction_params['preprocessing_reverse'], } return config
# TODO Should we try always work with dataframes with Time column as index column?
[docs] def get_data(self, datetime_column: Optional[str] = None) -> Union[pd.DataFrame, otp.Source]: """ Function of loading external data (e.g. onetick or csv). If an experiment is set to use of several data feeds, then they are merged by index or by the `datetime_column` (if it is specified). Before returning the result, `reset_index` is applied to the dataframe, since in the current version of the framework, preprocessing is based on a numerical index, and not on a datetime field. Parameters ---------- datetime_column: str, optional The field by which the data is merged if the experiment contains several data feeds. Returns ---------- result: pd.DataFrame Dataframe with data that will participate in the subsequent stages of the experiment pipeline (`prepare_data()` function). Important! `reset_index()` was applied to the data (see the description of the function above). The result is also stored in the attribute of an instance of the class Experiment: experiment.df """ sources = [] for datafeed in self.datafeeds: datafeed.schema = self.schema src = datafeed.load() sources.append(src) if len(sources) > 1 and isinstance(sources[0], otp.Source): src = otp.merge(sources) elif len(sources) > 1 and isinstance(sources[0], pd.DataFrame): on_column = None left_index_column = False right_index_column = False if len(sources) > 1: if datetime_column is None: left_index_column = True right_index_column = True else: on_column = datetime_column src = reduce(lambda left, right: pd.merge(left, right, on=on_column, left_index=left_index_column, right_index=right_index_column), sources) elif len(sources) < 1: raise ValueError("No datafeeds were specified") if isinstance(src, pd.DataFrame): src = src.reset_index() self.src = src return src
[docs] def prepare_data(self, src: Optional[Union[pd.DataFrame, otp.Source]] = None): """ Function of preparing data. It includes following steps: features calculation, sample splitting, and data preprocessing. Parameters ---------- src: pd.DataFrame or otp.Source, optional Data to prepare for feeding into a machine learning model. If the parameter is not specified, then data obtained at the previous step of an experiment is taken (the `experiment.df` attribute defined at the `get_data()` function execution stage). Results ---------- The results are stored in the Experiment properties, which is pandas slices from full dataframe: experiment.x_train experiment.x_val experiment.x_test experiment.y_train experiment.y_val experiment.y_test In addition, you can refer to the following attributes: experiment.y_unprocessed - dataframe with not preprocessed (original) target columns, used to calculate metrics """ if src is None: # TODO copy is needed only for case with double call of prepare_data (is it still needed?) src = self.src.copy(deep=True) # source type defines context for schema and pipeline if isinstance(src, pd.DataFrame): self.schema.context = 'pandas' elif isinstance(src, otp.Source): self.schema.context = 'onetick' # share schema object between all operators for operator in self.datafeeds + self.splitters + self.pipeline: if isinstance(operator, str): continue operator.schema = self.schema # split data or mark all as train if self.splitters[0]: src = self.splitters[0].transform(src) else: src["__SPLIT"] = "TRAIN" # copy unprocessed target columns to separate columns with prefix "__UNPROCESSED.", # to save original values before running processing pipeline. # this values will be needed to calculate metrics on reverse preprocessed data. self.schema.unprocessed_target_columns = [] op = SelectTargets(self.target_columns) op.schema = self.schema src = op.fit_transform(src) # run the whole processing pipeline for proc in self.pipeline: # treat strings as feature names if isinstance(proc, str): self.schema.add_features(proc) continue # treat ToPandas() as context switch from onetick to pandas elif isinstance(proc, ToPandas): if self.schema.context == "pandas": raise ValueError("ToPandas() operator cannot be used in pandas context") self.schema.context = "pandas" self.pipeline_query = src src = self.datafeeds[0].run(src) continue # run operator src = proc.fit(src) src = proc.transform(src) # # track how target columns names are changed, # # to be able to calculate metrics on reverse preprocessed data # proc.update_target_columns() # check if dataframe is empty, and raise error if it is # useful for debugging pipelines if self.schema.context == "pandas" and src.empty: raise ValueError(f"DataFrame become empty on operator: {proc}") # TODO rewrite this part to simplify calling of operator op = SelectFeatures(self.features_columns) op.schema = self.schema src = op.fit_transform(src) logger.info(f'schema.target_columns: {self.schema.target_columns}') logger.info(f'schema.features_columns: {self.schema.features_columns}') # save resulted dataframe to experiment object if self.schema.context == "pandas": self.df = src elif self.schema.context == "onetick": self.pipeline_query = src self.df = self.datafeeds[0].run(src) else: raise ValueError(f"Unknown data context: {self.schema.context}") if self.df.empty: raise ValueError("Dataframe is empty") for column in self.schema.target_columns + self.schema.features_columns: assert column in self.df.columns, f'Column {column} is missing in dataframe after pipeline execution' # save hashes of datafeeds for d in ['x_train', 'y_train', 'x_test', 'y_test', 'x_val', 'y_val', 'y_unprocessed']: self.datafeed_hashes[d] = joblib.hash(getattr(self, d)) # check if datafeeds are changed since experiment was saved in MLFlow if self.restored_datafeed_hashes is not None: if self.restored_datafeed_hashes != self.datafeed_hashes: logger.warning( "Current datafeed hashes are not equal with experiment saved in MLFlow")
@property def x_train(self): return self.df.loc[self.df['__SPLIT'] == 'TRAIN', self.schema.features_columns] @property def y_train(self): return self.df.loc[self.df['__SPLIT'] == 'TRAIN', self.schema.target_columns] @property def x_test(self): return self.df.loc[self.df['__SPLIT'] == 'TEST', self.schema.features_columns] @property def y_test(self): return self.df.loc[self.df['__SPLIT'] == 'TEST', self.schema.target_columns] @property def x_val(self): return self.df.loc[self.df['__SPLIT'] == 'VAL', self.schema.features_columns] @property def y_val(self): return self.df.loc[self.df['__SPLIT'] == 'VAL', self.schema.target_columns] @property def group_column(self): return self.df.loc[:, [self.schema.group_column_name]] @property def x_processed(self): return self.df.loc[:, self.schema.features_columns] @property def y_processed(self): return self.df.loc[:, self.schema.target_columns] @property def y_unprocessed(self): tmp_df = self.df.loc[self.df['__SPLIT'] == 'TEST', self.schema.unprocessed_target_columns] tmp_df = tmp_df.rename(columns={col: col.replace( '__UNPROCESSED.', '') for col in tmp_df.columns}) return tmp_df # TODO Implement features_importance func def features_importance(self, x_train=None, x_val=None, y_train=None, y_val=None, method='catboost_fi'): # pragma: no cover pass
[docs] def init_fit(self, train_params: Union[dict, None] = None, remote_mode: bool = False): """ Function of training models. It includes the model initialization and model fitting steps. In addition, at this stage, the logic of overfitting control, validation (including cross-validation), and hyperparameter tuning is implemented. Parameters ---------- train_params: dict, optional loss: str, optional One of {'MAE', 'RMSE', 'MAPE', 'MSLE'}. The loss function that is used to train the model. Default value is 'RMSE'. Important! The loss function can also be set in the `init_params` of the model, in which case it overrides the `loss` parameter. overfitting: dict, optional Parameters to limit the overfitting of the model during training. It includes: eval_metric: str One of {'MAE', 'RMSE', 'MAPE', 'R2', 'MSLE'}. The metric calculated on the validation set to control overfitting (determining the best iteration of model training, early stopping of model training). Default value is 'MAE'. early_stopping_rounds: int The maximum number of training iterations that occurs without improving the quality of model prediction on the validation sample. If the `eval_metric` does not improve during the `early_stopping_rounds` iterations, then the model training is completed. If `early_stopping_rounds` is not set or equal to zero, then early stopping is disabled and the model will be trained for the full number of iterations. Default value is 0. search_cv: dict, optional Validation and hyperparameter tuning settings. It includes: val_type: str The type of validation, one of {'Simple', 'Cross', 'WalkForward'}. Default is 'Simple'. folds: int The number of folds (used only for 'Cross' and 'WalkForward' validation). Default is 5. eval_metric: str One of {'MAE', 'RMSE', 'MAPE', 'R2', 'MSLE'}. The metric calculated on validation samples in order to determine the best set of hyperparameters when tuning them. In addition, when using cross and walk-forward validation, metric calculation results can be obtained using the `experiment.gscv_model.cv_results_` attribute (for example, to estimate the error variance). Default is 'MAE'. search_optimization: str Hyperparameter search method, one of {'grid', 'random', 'bayesian', 'bohb', 'hyperopt', 'none'} If value is `'none'`, then only first params combination used to train model. Default is 'grid'. n_trials: int Number of search iterations, used for 'random', 'bayesian', 'bohb', 'hyperopt' types. Default is 10. remote_mode: bool, optional If True, then using joblib with ray backand to parallelize processes on remote Ray workers|cluster. If False, then using joblib with threading backend to parallelize processes locally. Default value is False. Returns ------- result: onetick.ml.impl.models.RegressorModel Dataframe with data that will participate in the subsequent stages of the experiment pipeline. RegressorModel attributes: experiment.current_model_params: dict the best model parameters found as a result of tuning hyperparameters (when tuning is disabled, the first element from the list of values is selected for each hyperparameter) experiment.gscv_model: GridSearchCV/RandomizedSearchCV Scikit-learn GridSearchCV/RandomizedSearchCV model object experiment.model_result: object result of GridSearchCV/RandomizedSearchCV model `fit()` function experiment.dsf_model: object DS Framework model object, the `model` attribute of this object stores the original trained ML model (e.g. XGBoost or sklearn model). """ assert self.models, "Experiment.models must be defined to run .init_fit()" tune_metric_map = {'R2': 'r2', 'MAE': 'neg_mean_absolute_error', 'RMSE': 'neg_root_mean_squared_error', 'MSLE': 'neg_mean_squared_log_error', 'MAPE': 'neg_mean_absolute_percentage_error'} train_params = self.train_params if train_params is None else train_params general_train_params = {'loss': train_params.get('loss') or 'RMSE', 'verbose': train_params.get('verbose', DEFAULT_VERBOSE)} overfitting_params = train_params.get('overfitting') or {} search_cv_params: dict = train_params.get('search_cv') or {} search_optimization = search_cv_params.get('search_optimization', 'grid') scoring = tune_metric_map[search_cv_params.get('eval_metric', 'MAE')] val_type = search_cv_params.get('val_type', 'Simple') x_train = self.x_train y_train = self.y_train x_val = self.x_val y_val = self.y_val is_val_set_empty = False if x_val.empty or y_val.empty: # pragma: no cover logger.warning('Validation sample is empty, so validation is disabled.') is_val_set_empty = True dsf_models = {} models_cv_score = {} models_cv_results = pd.DataFrame() model_result = {} for model_num, model in enumerate(self.models): init_params_grid = model.init_params or {} model.is_val_set_empty = is_val_set_empty if search_optimization == 'none': model_params = list(params_iterator(init_params_grid))[0] init_params_grid = {k: [v] for k, v in model_params.items()} logger.warning( f'Optimization is disabled, the first combination of parameters are used: {model_params}') # init model dsf_params = {**overfitting_params, **general_train_params} dsf_model = model dsf_model.init_model(dsf_params, init_params_grid) # todo it needs to change simple validation cv = create_folds( x_train=x_train, x_val=x_val, y_val=y_val, # val_size=self.splitters[0].val_size, # test_size=self.splitters[0].test_size, val_type=val_type, folds_num=search_cv_params.get('folds', 5) ) # TODO turn off refit for None and Simple?? # init *SearchCV logger.info(f"Training model: {model.name}. <dsf_params>: {dsf_params} " f"<init_params>: {init_params_grid} <fit_params>:{model.fit_params}") sklearn_params = dict( # early_stopping=search_cv_params['early_stopping'], scoring=scoring, cv=cv, refit=False, verbose=self.train_params.get('verbose', DEFAULT_VERBOSE), ) if search_optimization in ['grid', 'none']: dsf_model.sklearn_searcher = GridSearchCV( dsf_model.model, init_params_grid, **sklearn_params) elif search_optimization == 'random': sklearn_params['n_iter'] = search_cv_params.get('n_trials', 10) dsf_model.sklearn_searcher = RandomizedSearchCV( dsf_model.model, init_params_grid, **sklearn_params) else: raise Exception(f"'{search_optimization}' search_optimization is currently not supported, " "please use 'grid', 'random' or 'none'") eval_set = None if is_val_set_empty else [(x_val, y_val)] joblib_backend_name = "threading" if remote_mode: if ray.is_initialized(): joblib_backend_name = "ray" register_ray() else: raise Exception( "Ray is not initialized, please use ray.init() to use remote_mode=True") if val_type == 'Simple': x = pd.concat([x_train, x_val], axis=0) y = pd.concat([y_train, y_val], axis=0) else: x = x_train y = y_train with joblib.parallel_backend(joblib_backend_name, n_jobs=self.train_params.get("n_jobs", -1)): # model fit + hyper-parameters search (if enabled) model_result[f'{model.name}_id{model_num}'] = dsf_model.fit(x, y, eval_set=eval_set) # prepare and save intermediate results dsf_models[f'{model.name}_id{model_num}'] = dsf_model models_cv_score[f'{model.name}_id{model_num}'] = dsf_model.sklearn_searcher.best_score_ cv_results = pd.DataFrame(dsf_model.sklearn_searcher.cv_results_) cv_results['model'] = f'{model.name}_id{model_num}' models_cv_results = pd.concat([models_cv_results, cv_results], ignore_index=True) models_cv_results = models_cv_results.loc[:, ~ models_cv_results.columns.str.contains('param_')] # prepare final results models_cv_results.insert(0, 'model', models_cv_results.pop('model')) models_cv_results = models_cv_results.drop('rank_test_score', axis=1) best_dsf_model = max(models_cv_score, key=models_cv_score.get) dsf_model = dsf_models[best_dsf_model] # save final results self.models_cv_score = models_cv_score self.models_cv_results = models_cv_results self.gscv_model = dsf_model.sklearn_searcher self.current_model_params = dsf_model.sklearn_searcher.best_params_ self.model_result = model_result logger.info(f'Best params: {self.current_model_params}') # refit on best params dsf_model.init_model(dsf_params, self.current_model_params) dsf_model.fit(x_train, y_train, eval_set=eval_set) self.native_model = dsf_model.model self.dsf_model = dsf_model return dsf_model
[docs] def predict(self, x=None, model=None, preproc_reverse=None): """ Function of predicting results using a trained model. Parameters ---------- x: pd.DataFrame, optional Data containing features on the basis of which it needs to make a prediction. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.self.x_processed` (see `prepare_data` function above). model: object (it depends on used model), optional Trained model. If the parameter is not specified, then model obtained at the previous step of an experiment is taken: `experiment.dsf_model` (see `init_fit` function above). preproc_reverse: bool, optional Enable/disable reverse data processing. In case the value is True, if processing was applied to the target values at the data preparation stage, then reverse processing is applied to the received predicted values. Default value is True. Returns ------- result: pd.DataFrame Model prediction data. In addition, the results obtained are stored in the attributes of the experiment class: experiment.prediction - the original prediction of the model experiment.prediction_reverse_processed - prediction of the model after applying reverse processing, in the case when `preproc_reverse = False`, the parameter value is None """ if model is None: model = self.dsf_model if x is None: if self.x_test is not None and not self.x_test.empty: x = self.x_test else: raise ValueError('The `predict()` function uses a test sample by default (`x = exp.x_test`), ' 'but it seems to be empty. Please, adjust your splitters to fill test sample,' 'or specify the `x` parameter explicitly.') if preproc_reverse is None: preproc_reverse = self.prediction_params.get('preprocessing_reverse', True) self.preproc_reverse = preproc_reverse result = model.predict(x) prediction = pd.DataFrame(result, index=x.index, columns=self.schema.target_columns) self.prediction = prediction # add column with values used for grouping (by symbol in most cases) if self.schema.group_column_name is not None: prediction = prediction.join(self.group_column) # run reverse preprocessing if needed if preproc_reverse: # add utility columns to prediction, if they were used in reverse transformation prediction = prediction.join(self.df[self.schema.utility_columns]) # run pipeline in reverse order for preproc in self.pipeline[::-1]: if hasattr(preproc, 'inverse_transform'): prediction = preproc.inverse_transform(prediction_df=prediction) self.prediction_reverse_processed = prediction.loc[:, self.schema.target_columns] return self.prediction_reverse_processed # or just return prediction without reverse preprocessing return self.prediction
def _calc_metrics(self, y=None, prediction=None): metrics = {} for idx, column in enumerate(y.columns): for evaluator in self.evaluators: metric_value = evaluator.evaluate(y.iloc[:, idx], prediction.iloc[:, idx]) metrics[f'{column}_{evaluator.name}'] = metric_value return metrics
[docs] def calc_metrics(self, y=None, prediction=None, group_by_column: bool = False): """ Function of calculating metrics. Parameters ---------- y: pd.DataFrame, optional Target data. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) Obviously, only the indices present in the prediction are selected from the target data. If `group_by_column` is True, then `y` must contain the column specified in `Experiment.group_column_name`. prediction: pd.DataFrame, optional Model prediction. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.prediction_reverse_processed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.prediction` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) group_by_column: bool, optional Apply grouping by column during metric calculation. Each metric is calculated for each group separately. If the parameter is False, then the data is not grouped. Resulting metrics are returned as a dictionary of dictionaries. Default value is False. Returns ------- result: dict or dict of dict Calculated metric values in dict. If `group_by_column` is True, then the result is a dictionary of dictionaries. """ if y is None: y = self.y_test if self.preproc_reverse: y = self.y_unprocessed.loc[self.prediction_reverse_processed.index] if prediction is None: if self.preproc_reverse: prediction = self.prediction_reverse_processed[self.schema.target_columns] else: prediction = self.prediction[self.schema.target_columns] if prediction.empty or y.empty: # pragma: no cover logger.warning('Sample is empty, there is no data for calculating metrics.') return None if group_by_column is False: metrics = self._calc_metrics(y, prediction) self.current_metrics = metrics return metrics else: metrics = {} _prediction = prediction if self.schema.group_column_name not in prediction.columns: if self.schema.group_column_name not in self.df: _prediction = _prediction.join(y[self.schema.group_column_name], how='inner') else: _prediction = _prediction.join(self.group_column, how='inner') for group, group_df in _prediction.groupby(self.schema.group_column_name): _metrics = self._calc_metrics( y=y.loc[group_df.index, ~y.columns.isin([self.schema.group_column_name])], prediction=_prediction.loc[ group_df.index, ~_prediction.columns.isin([self.schema.group_column_name]) ] ) metrics[group] = _metrics self.current_metrics = metrics return metrics
[docs] def calc_baseline(self, y: pd.DataFrame = None, group_by_column: bool = False) -> dict: """ Function of building a baseline model and calculating its metrics. The following model is used as a baseline: the current predicted value of the time series is equal to the previous actual value of the time series. Parameters ---------- y: pd.DataFrame, optional Target data. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) Obviously, only the indices present in the prediction are selected from the target data. group_by_column: bool, optional Apply grouping by column. If the parameter is False, then the data is not grouped. Each metric is calculated for each group separately. Resulting metrics are returned as a dictionary of dictionaries. Default value is False. Returns ---------- result: dict Calculated metric values of the baseline model. If `group_by_column` is specified, then the result is a dictionary of dictionaries. """ if y is None: if self.preproc_reverse: y = self.y_unprocessed.loc[self.prediction_reverse_processed.index].copy() y.columns = y.columns.str.replace('__UNPROCESSED.', '', regex=False) else: y = self.y_processed.loc[self.prediction.index] if not group_by_column and not self.schema.group_column_name: y_shifted = y.shift(1).dropna() metrics = self._calc_metrics(y.loc[y_shifted.index], y_shifted) return metrics if self.schema.group_column_name in y.columns: y_with_group = y else: y_with_group = y.join(self.group_column) y_without_group = y_with_group.loc[:, ~y_with_group.columns.isin( [self.schema.group_column_name])] if not group_by_column: y_shifted = y_with_group.groupby(self.schema.group_column_name).shift(1).dropna() metrics = self._calc_metrics(y_without_group.loc[y_shifted.index], y_shifted) else: metrics = {} for group, group_df in y_with_group.groupby(self.schema.group_column_name): y_shifted = group_df.shift(1).dropna() _metrics = self._calc_metrics(y_without_group.loc[y_shifted.index], y_shifted) metrics[group] = _metrics return metrics
[docs] def prediction_intervals_onestep(self, y: Optional[pd.DataFrame] = None, prediction: Optional[pd.DataFrame] = None, z_value: float = 1.96) -> pd.DataFrame: """ Function of calculating one-step prediction interval using the standard deviation of the residuals. See: https://otexts.com/fpp3/prediction-intervals.html#one-step-prediction-intervals Parameters ---------- y: pd.DataFrame, optional Target data. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) Obviously, only the indices present in the prediction are selected from the target data. prediction: pd.DataFrame, optional Model prediction. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.prediction_reverse_processed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.prediction` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) z_value: float, optional Z value for confidence interval. Default value is 1.96. Returns ---------- result: pd.DataFrame Calculated prediction intervals. """ if y is None: if self.preproc_reverse: y = self.y_unprocessed.loc[self.prediction_reverse_processed.index] else: y = self.y_processed.loc[self.prediction.index] if prediction is None: if self.preproc_reverse: prediction = self.prediction_reverse_processed else: prediction = self.prediction if prediction.empty or y.empty: # pragma: no cover logger.warning('Sample is empty, there is no data for calculating metrics.') return None confidence = OneStepPredictionIntervals() delta_vals = confidence.evaluate(y=y, prediction=prediction, z_value=z_value) pi_df = pd.DataFrame() for column in y.columns: pi_df[f'{column}_LOWER_BORDER'] = prediction[column] - delta_vals[f'{column}_DELTA'] pi_df[f'{column}_UPPER_BORDER'] = prediction[column] + delta_vals[f'{column}_DELTA'] return pi_df
# TODO it needs to finalize BootstrapPredictionIntervals def prediction_intervals_bootstrap(self, experiment=None): # pragma: no cover if experiment is None: experiment = self confidence = BootstrapPredictionIntervals() pi_df = confidence.evaluate(experiment=experiment) return pi_df
[docs] def save_pyfunc_model(self, path: str): """ Save model as pyfunc MLFlow model Parameters ---------- path : str Path to save model. """ native_model_path = os.path.join(path, "native_model") pyfunc_model_path = os.path.join(path, "pyfunc_model") model_signature = self.dsf_model.infer_signature(self.x_test, self.y_test) self.dsf_model.mlflow_register.save_model( self.dsf_model.get_mlflow_model(), native_model_path, signature=model_signature) mlflow.pyfunc.save_model( pyfunc_model_path, python_model=self._pyfunc_wrapper(self.dsf_model), artifacts={ self.dsf_model.model_name: native_model_path } )
[docs] def save_model(self, *args, **kwargs): """Save `Experiment.dsf_model` to disk, using native `save_model()` from used ML library. Pass any `args` and `kwargs`, that native ML library consume. """ self.dsf_model.save_model(*args, **kwargs)
[docs] def load_model(self, dsf_model_class, *args, **kwargs): """Initialize and load model from disk, using native `load_model()` from used ML library. Pass any `args` and `kwargs`, that native ML library consume. Loaded model will be set to `dsf_model` attribute of current experiment instance. Returns ------- any loaded model instance """ self.dsf_model = dsf_model_class self.dsf_model.init_model() model = self.dsf_model.load_model(*args, experiment=self, **kwargs) self.dsf_model.model = model return model
[docs] def run(self, x=None, y=None, remote_mode=False): """Run full experiment cycle, consisting of 3 stages: 1. Data load and preprocessing 2. Model training 3. Evaluation Trained model could be found in `Experiment.dsf_model` attribute. Method will return tuple, containing metrics and predictions calculated on test set, if test_size is greater then 0, or on whole train set otherwise. Parameters ---------- x : pd.DataFrame, optional Input data, by default None y : pd.DataFrame, optional Target data, by default None remote_mode : bool, optional Flag, indicating if experiment should be run remotely on Ray, by default False Returns ------- tuple metrics, predictions """ self.get_data() self.prepare_data() self.init_fit(remote_mode=remote_mode) predictions = self.predict(x=x) metrics = self.calc_metrics(y=y) return metrics, predictions
[docs] def save_mlflow_run(self): """Saves last run to MLFlow and returns run ID. Returns: str: MLFlow run ID. """ self._start_mlflow_run(self.current_model_params) self._mlflow_track_model(self.dsf_model) self._mlflow_track_metrics(self.current_metrics) if self.mlflow_url: experiment_link = f"{self.mlflow_url}/#/experiments/{self.mlflow_run.info.experiment_id}" run_link = f"{experiment_link}/runs/{self.mlflow_run.info.run_id}" logger.info(f"MLFlow experiment link: {experiment_link}") logger.info(f"MLFlow logged run link: {run_link}") run_id = self.mlflow_run.info.run_id self.mlflow_run = None mlflow.end_run() return run_id
def _start_mlflow_run(self, model_params): if mlflow.active_run(): logger.warning( "you've not calculated metrics last run, so metrics are not recorded in MLFlow.") self.mlflow_run = None mlflow.end_run() if self.mlflow_url: mlflow.set_tracking_uri(self.mlflow_url) mlflow.set_experiment(self.experiment_name) self.mlflow_run = mlflow.start_run() mlflow.log_params(model_params) mlflow.log_dict(self.serialize_config(), 'config.yaml') mlflow.log_dict(self.datafeed_hashes, 'datahashes.yaml') mlflow.set_tags({ "model_name": self.dsf_model.name, "onetickml_version": onetick.ml.__version__, }) def _pyfunc_wrapper(self, model): """Builds `_ModelWrapper` class for MLFlow to pass as `python_model=` param to `log_model()`. Parameters ---------- model : any Model class to wrap (passed only by MLFlow inner functions) Returns ------- mlflow.pyfunc.PythonModel Class for wrapper based on pyfunc.PythonModel """ _datafeeds = self.datafeeds _pipeline = self.pipeline _dsf_model_class = self.dsf_model.__class__ _schema = self.schema preproc_reverse = self.prediction_params.get('preprocessing_reverse', True) _group_column_name = self.schema.group_column_name class _ModelWrapper(mlflow.pyfunc.PythonModel, PreparePredictDataMixin): datafeeds = _datafeeds pipeline = _pipeline dsf_model_class = _dsf_model_class group_column_name = _group_column_name schema = _schema def load_context(self, context): self.wrapped_model = model.mlflow_register.load_model( model_uri=context.artifacts[model.model_name]) def load_data(self, parameters: Optional[dict] = None): # use datafeed params as defaults kwargs = self.datafeeds[0]._kwargs.copy() if parameters: kwargs.update(parameters) # replace start/end with datetime objects if it's string (ex: from Airflow DAG) for key in ["start", "end"]: if key in kwargs and isinstance(kwargs[key], str): kwargs[key] = datetime.datetime.fromisoformat(kwargs[key]) # init new datafeed instance and use it to load data datafeed = self.datafeeds[0].__class__(**kwargs) src = datafeed.load() return datafeed.run(src).reset_index() def predict(self, context, model_input): """ Predict method for MLFlow pyfunc model. Parameters ---------- context : mlflow.pyfunc.PythonModelContext A PythonModelContext instance containing artifacts that the model can use to perform inference. model_input : pd.DataFrame A pyfunc-compatible input for the model to evaluate. Returns ------- pd.DataFrame Prediction made by wrapped model. Reverse processing applied with respect to Experiment params/ """ _df = model_input.copy(deep=True) x = self.prepare_predict_data(_df) # model predict dsf_model = self.dsf_model_class() dsf_model.model = self.wrapped_model result = dsf_model.predict(x) prediction = pd.DataFrame(result, index=x.index, columns=self.schema.target_columns) # reverse processing if preproc_reverse: for preproc in self.pipeline[::-1]: if isinstance(preproc, BasePreprocess): prediction = preproc.inverse_transform(prediction_df=prediction) return prediction.loc[:, self.schema.target_columns] return prediction return _ModelWrapper() def _mlflow_track_model(self, model): if self.log_models and model.mlflow_register: model_signature = model.infer_signature(self.x_test, self.y_test) model_info = model.mlflow_register.log_model( model.get_mlflow_model(), artifact_path=model.model_name, registered_model_name=model.model_name, signature=model_signature) mlflow.pyfunc.log_model( f"wrapped_{model.model_name}", registered_model_name=f"wrapped_{model.model_name}", python_model=self._pyfunc_wrapper(model), artifacts={ model.model_name: model_info.model_uri } ) def _mlflow_track_metrics(self, estimations): if self._use_mlflow(): mlflow.log_metrics(estimations) mlflow.end_run()
[docs] def load_mlflow_model(self, run_id: str): """Load model from MLFlow Registry from run having specified `run_id`. Resulted model with be set to `dsf_model` attribute of current experiment instance. Returns: model: loaded model instance """ if self.mlflow_url: mlflow.set_tracking_uri(self.mlflow_url) client = MlflowClient() run = client.get_run(run_id) init_params = run.data.params self.dsf_model = None for model in self.models: if model.model_name == run.data.tags['model_name']: self.dsf_model = model break assert self.dsf_model, "Model with specified name not found in experiment models list" # initialize dsf_model with params from run overfitting_params = self.train_params.get('overfitting', {}) if overfitting_params is None: overfitting_params = {} general_train_params = {'loss': self.train_params.get('loss') or 'RMSE', 'verbose': self.train_params.get('verbose') or DEFAULT_VERBOSE} dsf_params = {**overfitting_params, **general_train_params} self.dsf_model.init_model(dsf_params, init_params) # load model from MLFlow run self.dsf_model.model = self.dsf_model.mlflow_register.load_model( f"runs:/{run_id}/{self.dsf_model.name}") return self.dsf_model