import os
import copy
import ray
import joblib
import ray
import mlflow
import datetime
import pandas as pd
import onetick.py as otp
from functools import reduce
from typing import Optional, Union
from mlflow.tracking import MlflowClient
from ray.util.joblib import register_ray
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
import onetick.ml
from onetick.ml.impl.data_pipelines.data_pipeline import SelectFeatures, SelectTargets
from onetick.ml.impl.data_pipelines.ot_pipeline import ToPandas
from onetick.ml.utils.schema import DataSchema
from onetick.ml.impl.evaluators import (BootstrapPredictionIntervals,
OneStepPredictionIntervals)
from onetick.ml.interfaces import BaseExperiment
from onetick.ml.interfaces.data_pipelines import BaseFeatures, BaseFilter, BasePreprocess
from onetick.ml.utils import create_folds, logger, params_iterator
DEFAULT_VERBOSE = 0
class PreparePredictDataMixin:
"""
Mixin for experiments and wrapped model to prepare data for prediction.
Need to have `pipeline` attribute in class.
"""
def prepare_predict_data(self, df: pd.DataFrame):
"""
Prepare data for prediction only:
- pipeline transformations is applied to data (without target columns and without fit)
- return prepared data as pd.DataFrame with features only
Parameters
----------
df : pd.DataFrame
Plain data to prepare for prediction.
Format is expected to be the same as comes from datafeed.
Returns
-------
x : pd.DataFrame
Prepared data, suitable to be passed in `Experiment.predict()` method.
"""
src = df
for proc in self.pipeline:
if isinstance(proc, str):
self.schema.add_features(proc)
continue
proc.schema = self.schema
if proc.refit_on_predict:
src = proc.fit(src)
src = proc.transform(src)
x = src.loc[:, self.schema.features_columns]
return x
[docs]class Experiment(BaseExperiment, PreparePredictDataMixin):
"""
Implements full cycle to run experiment.
"""
general = dict(
experiment_name='default_experiment',
log_models=False,
mlflow_url=None,
)
datafeeds = []
splitters = []
pipeline = []
models = []
evaluators = []
features_columns = []
target_columns = []
group_column_name = None
# train params
train_params = dict(
loss='RMSE',
verbose=1,
overfitting={},
search_cv={},
n_jobs=-1,
)
# prediction and evaluation params
prediction_params = {'preprocessing_reverse': True}
def __init__(self):
# assert self.datafeeds, "datafeed must be defined to run experiment"
# assert self.splitters, "splitters must be defined to run experiment"
self.df = pd.DataFrame()
self.src = None
self.pipeline_query = None
self.prediction, self.prediction_reverse_processed = None, None
self._prediction_input = None
self.mlflow_run = None
self.results = {}
self.dsf_model = None
self.gscv_model = None
self.native_model = None
self.models_cv_score = None
self.models_cv_results = None
self.current_model_params = None
self.model_signature = None
self.model_result = None
self.preproc_reverse = None
self.current_metrics = None
self.datafeed_hashes = {}
self.restored_datafeed_hashes = None
self.general: dict = {} if self.general is None else self.general
self.train_params: dict = {} if self.train_params is None else self.train_params
self.experiment_name: str = self.general.get('experiment_name', 'default_experiment')
self.log_models = self.general.get('log_models', False)
self.mlflow_url = self.general.get('mlflow_url', None)
self.wrapped_model = None
self.schema = DataSchema()
self.schema.group_column_name = self.group_column_name
if not self.features_columns:
raise RuntimeError("No features columns selected. "
"List features columns in Experiment.features_columns attribute.")
if not self.target_columns:
raise RuntimeError("No target columns selected. "
"List target columns in Experiment.target_columns attribute.")
def _use_mlflow(self) -> bool:
"""Boolean showing are able to use MLFlow."""
return bool(self.mlflow_url and self.experiment_name)
[docs] def serialize_config(self):
"""Dump current instance config to `dict`,
that could be used to reconstruct class by `build_experiment_class()`.
"""
config = {}
for attr in ['general', 'group_column_name', 'train_params', 'features_columns', 'target_columns']:
config[attr] = getattr(self, attr)
for attr in ['datafeeds', 'splitters', 'pipeline', "models", "evaluators"]:
obj_list = []
for obj in getattr(self, attr):
obj_cfg = copy.deepcopy(obj._init_params)
obj_list.append(obj_cfg)
config[attr] = obj_list
config['prediction'] = {
'preprocessing_reverse': self.prediction_params['preprocessing_reverse'],
}
return config
# TODO Should we try always work with dataframes with Time column as index column?
[docs] def get_data(self, datetime_column: Optional[str] = None) -> Union[pd.DataFrame, otp.Source]:
"""
Function of loading external data (e.g. onetick or csv).
If an experiment is set to use of several data feeds, then they are merged by index or
by the `datetime_column` (if it is specified).
Before returning the result, `reset_index` is applied to the dataframe, since in the current version of
the framework, preprocessing is based on a numerical index, and not on a datetime field.
Parameters
----------
datetime_column: str, optional
The field by which the data is merged if the experiment contains several data feeds.
Returns
----------
result: pd.DataFrame
Dataframe with data that will participate in the subsequent stages of the experiment pipeline
(`prepare_data()` function).
Important! `reset_index()` was applied to the data (see the description of the function above).
The result is also stored in the attribute of an instance of the class Experiment: experiment.df
"""
sources = []
for datafeed in self.datafeeds:
datafeed.schema = self.schema
src = datafeed.load()
sources.append(src)
if len(sources) > 1 and isinstance(sources[0], otp.Source):
src = otp.merge(sources)
elif len(sources) > 1 and isinstance(sources[0], pd.DataFrame):
on_column = None
left_index_column = False
right_index_column = False
if len(sources) > 1:
if datetime_column is None:
left_index_column = True
right_index_column = True
else:
on_column = datetime_column
src = reduce(lambda left, right: pd.merge(left,
right,
on=on_column,
left_index=left_index_column,
right_index=right_index_column), sources)
elif len(sources) < 1:
raise ValueError("No datafeeds were specified")
if isinstance(src, pd.DataFrame):
src = src.reset_index()
self.src = src
return src
[docs] def prepare_data(self, src: Optional[Union[pd.DataFrame, otp.Source]] = None):
"""
Function of preparing data.
It includes following steps: features calculation, sample splitting, and data preprocessing.
Parameters
----------
src: pd.DataFrame or otp.Source, optional
Data to prepare for feeding into a machine learning model.
If the parameter is not specified, then data obtained at the previous step of an experiment is taken
(the `experiment.df` attribute defined at the `get_data()` function execution stage).
Results
----------
The results are stored in the Experiment properties, which is pandas slices from full dataframe:
experiment.x_train
experiment.x_val
experiment.x_test
experiment.y_train
experiment.y_val
experiment.y_test
In addition, you can refer to the following attributes:
experiment.y_unprocessed - dataframe with not preprocessed (original) target columns,
used to calculate metrics
"""
if src is None:
# TODO copy is needed only for case with double call of prepare_data (is it still needed?)
src = self.src.copy(deep=True)
# source type defines context for schema and pipeline
if isinstance(src, pd.DataFrame):
self.schema.context = 'pandas'
elif isinstance(src, otp.Source):
self.schema.context = 'onetick'
# share schema object between all operators
for operator in self.datafeeds + self.splitters + self.pipeline:
if isinstance(operator, str):
continue
operator.schema = self.schema
# split data or mark all as train
if self.splitters[0]:
src = self.splitters[0].transform(src)
else:
src["__SPLIT"] = "TRAIN"
# copy unprocessed target columns to separate columns with prefix "__UNPROCESSED.",
# to save original values before running processing pipeline.
# this values will be needed to calculate metrics on reverse preprocessed data.
self.schema.unprocessed_target_columns = []
op = SelectTargets(self.target_columns)
op.schema = self.schema
src = op.fit_transform(src)
# run the whole processing pipeline
for proc in self.pipeline:
# treat strings as feature names
if isinstance(proc, str):
self.schema.add_features(proc)
continue
# treat ToPandas() as context switch from onetick to pandas
elif isinstance(proc, ToPandas):
if self.schema.context == "pandas":
raise ValueError("ToPandas() operator cannot be used in pandas context")
self.schema.context = "pandas"
self.pipeline_query = src
src = self.datafeeds[0].run(src)
continue
# run operator
src = proc.fit(src)
src = proc.transform(src)
# # track how target columns names are changed,
# # to be able to calculate metrics on reverse preprocessed data
# proc.update_target_columns()
# check if dataframe is empty, and raise error if it is
# useful for debugging pipelines
if self.schema.context == "pandas" and src.empty:
raise ValueError(f"DataFrame become empty on operator: {proc}")
# TODO rewrite this part to simplify calling of operator
op = SelectFeatures(self.features_columns)
op.schema = self.schema
src = op.fit_transform(src)
logger.info(f'schema.target_columns: {self.schema.target_columns}')
logger.info(f'schema.features_columns: {self.schema.features_columns}')
# save resulted dataframe to experiment object
if self.schema.context == "pandas":
self.df = src
elif self.schema.context == "onetick":
self.pipeline_query = src
self.df = self.datafeeds[0].run(src)
else:
raise ValueError(f"Unknown data context: {self.schema.context}")
if self.df.empty:
raise ValueError("Dataframe is empty")
for column in self.schema.target_columns + self.schema.features_columns:
assert column in self.df.columns, f'Column {column} is missing in dataframe after pipeline execution'
# save hashes of datafeeds
for d in ['x_train', 'y_train', 'x_test', 'y_test', 'x_val', 'y_val', 'y_unprocessed']:
self.datafeed_hashes[d] = joblib.hash(getattr(self, d))
# check if datafeeds are changed since experiment was saved in MLFlow
if self.restored_datafeed_hashes is not None:
if self.restored_datafeed_hashes != self.datafeed_hashes:
logger.warning(
"Current datafeed hashes are not equal with experiment saved in MLFlow")
@property
def x_train(self):
return self.df.loc[self.df['__SPLIT'] == 'TRAIN', self.schema.features_columns]
@property
def y_train(self):
return self.df.loc[self.df['__SPLIT'] == 'TRAIN', self.schema.target_columns]
@property
def x_test(self):
return self.df.loc[self.df['__SPLIT'] == 'TEST', self.schema.features_columns]
@property
def y_test(self):
return self.df.loc[self.df['__SPLIT'] == 'TEST', self.schema.target_columns]
@property
def x_val(self):
return self.df.loc[self.df['__SPLIT'] == 'VAL', self.schema.features_columns]
@property
def y_val(self):
return self.df.loc[self.df['__SPLIT'] == 'VAL', self.schema.target_columns]
@property
def group_column(self):
return self.df.loc[:, [self.schema.group_column_name]]
@property
def x_processed(self):
return self.df.loc[:, self.schema.features_columns]
@property
def y_processed(self):
return self.df.loc[:, self.schema.target_columns]
@property
def y_unprocessed(self):
tmp_df = self.df.loc[self.df['__SPLIT'] == 'TEST', self.schema.unprocessed_target_columns]
tmp_df = tmp_df.rename(columns={col: col.replace(
'__UNPROCESSED.', '') for col in tmp_df.columns})
return tmp_df
# TODO Implement features_importance func
def features_importance(self,
x_train=None,
x_val=None,
y_train=None,
y_val=None,
method='catboost_fi'): # pragma: no cover
pass
[docs] def init_fit(self,
train_params: Union[dict, None] = None,
remote_mode: bool = False):
"""
Function of training models.
It includes the model initialization and model fitting steps.
In addition, at this stage, the logic of overfitting control, validation (including cross-validation),
and hyperparameter tuning is implemented.
Parameters
----------
train_params: dict, optional
loss: str, optional
One of {'MAE', 'RMSE', 'MAPE', 'MSLE'}. The loss function that is used to train the model.
Default value is 'RMSE'.
Important! The loss function can also be set in the `init_params` of the model, in which case it
overrides the `loss` parameter.
overfitting: dict, optional
Parameters to limit the overfitting of the model during training.
It includes:
eval_metric: str
One of {'MAE', 'RMSE', 'MAPE', 'R2', 'MSLE'}. The metric calculated on the validation set to
control overfitting (determining the best iteration of model training, early stopping of model
training).
Default value is 'MAE'.
early_stopping_rounds: int
The maximum number of training iterations that occurs without improving the quality of model
prediction on the validation sample.
If the `eval_metric` does not improve during the `early_stopping_rounds` iterations, then the
model training is completed.
If `early_stopping_rounds` is not set or equal to zero, then early stopping is disabled and the
model will be trained for the full number of iterations.
Default value is 0.
search_cv: dict, optional
Validation and hyperparameter tuning settings.
It includes:
val_type: str
The type of validation, one of {'Simple', 'Cross', 'WalkForward'}.
Default is 'Simple'.
folds: int
The number of folds (used only for 'Cross' and 'WalkForward' validation).
Default is 5.
eval_metric: str
One of {'MAE', 'RMSE', 'MAPE', 'R2', 'MSLE'}. The metric calculated on validation samples in
order to determine the best set of hyperparameters when tuning them.
In addition, when using cross and walk-forward validation, metric calculation results can be
obtained using the `experiment.gscv_model.cv_results_` attribute (for example, to estimate the
error variance).
Default is 'MAE'.
search_optimization: str
Hyperparameter search method, one of {'grid', 'random', 'bayesian', 'bohb', 'hyperopt', 'none'}
If value is `'none'`, then only first params combination used to train model.
Default is 'grid'.
n_trials: int
Number of search iterations, used for 'random', 'bayesian', 'bohb', 'hyperopt' types.
Default is 10.
remote_mode: bool, optional
If True, then using joblib with ray backand to parallelize processes on remote Ray workers|cluster.
If False, then using joblib with threading backend to parallelize processes locally.
Default value is False.
Returns
-------
result: onetick.ml.impl.models.RegressorModel
Dataframe with data that will participate in the subsequent stages of the experiment pipeline.
RegressorModel attributes:
experiment.current_model_params: dict
the best model parameters found as a result of tuning hyperparameters
(when tuning is disabled, the first element from the list of values
is selected for each hyperparameter)
experiment.gscv_model: GridSearchCV/RandomizedSearchCV
Scikit-learn GridSearchCV/RandomizedSearchCV model object
experiment.model_result: object
result of GridSearchCV/RandomizedSearchCV model `fit()` function
experiment.dsf_model: object
DS Framework model object, the `model` attribute of this object stores the
original trained ML model (e.g. XGBoost or sklearn model).
"""
assert self.models, "Experiment.models must be defined to run .init_fit()"
tune_metric_map = {'R2': 'r2',
'MAE': 'neg_mean_absolute_error',
'RMSE': 'neg_root_mean_squared_error',
'MSLE': 'neg_mean_squared_log_error',
'MAPE': 'neg_mean_absolute_percentage_error'}
train_params = self.train_params if train_params is None else train_params
general_train_params = {'loss': train_params.get('loss') or 'RMSE',
'verbose': train_params.get('verbose', DEFAULT_VERBOSE)}
overfitting_params = train_params.get('overfitting') or {}
search_cv_params: dict = train_params.get('search_cv') or {}
search_optimization = search_cv_params.get('search_optimization', 'grid')
scoring = tune_metric_map[search_cv_params.get('eval_metric', 'MAE')]
val_type = search_cv_params.get('val_type', 'Simple')
x_train = self.x_train
y_train = self.y_train
x_val = self.x_val
y_val = self.y_val
is_val_set_empty = False
if x_val.empty or y_val.empty: # pragma: no cover
logger.warning('Validation sample is empty, so validation is disabled.')
is_val_set_empty = True
dsf_models = {}
models_cv_score = {}
models_cv_results = pd.DataFrame()
model_result = {}
for model_num, model in enumerate(self.models):
init_params_grid = model.init_params or {}
model.is_val_set_empty = is_val_set_empty
if search_optimization == 'none':
model_params = list(params_iterator(init_params_grid))[0]
init_params_grid = {k: [v] for k, v in model_params.items()}
logger.warning(
f'Optimization is disabled, the first combination of parameters are used: {model_params}')
# init model
dsf_params = {**overfitting_params, **general_train_params}
dsf_model = model
dsf_model.init_model(dsf_params, init_params_grid)
# todo it needs to change simple validation
cv = create_folds(
x_train=x_train,
x_val=x_val,
y_val=y_val,
# val_size=self.splitters[0].val_size,
# test_size=self.splitters[0].test_size,
val_type=val_type,
folds_num=search_cv_params.get('folds', 5)
)
# TODO turn off refit for None and Simple??
# init *SearchCV
logger.info(f"Training model: {model.name}. <dsf_params>: {dsf_params} "
f"<init_params>: {init_params_grid} <fit_params>:{model.fit_params}")
sklearn_params = dict(
# early_stopping=search_cv_params['early_stopping'],
scoring=scoring,
cv=cv,
refit=False,
verbose=self.train_params.get('verbose', DEFAULT_VERBOSE),
)
if search_optimization in ['grid', 'none']:
dsf_model.sklearn_searcher = GridSearchCV(
dsf_model.model,
init_params_grid,
**sklearn_params)
elif search_optimization == 'random':
sklearn_params['n_iter'] = search_cv_params.get('n_trials', 10)
dsf_model.sklearn_searcher = RandomizedSearchCV(
dsf_model.model,
init_params_grid,
**sklearn_params)
else:
raise Exception(f"'{search_optimization}' search_optimization is currently not supported, "
"please use 'grid', 'random' or 'none'")
eval_set = None if is_val_set_empty else [(x_val, y_val)]
joblib_backend_name = "threading"
if remote_mode:
if ray.is_initialized():
joblib_backend_name = "ray"
register_ray()
else:
raise Exception(
"Ray is not initialized, please use ray.init() to use remote_mode=True")
if val_type == 'Simple':
x = pd.concat([x_train, x_val], axis=0)
y = pd.concat([y_train, y_val], axis=0)
else:
x = x_train
y = y_train
with joblib.parallel_backend(joblib_backend_name, n_jobs=self.train_params.get("n_jobs", -1)):
# model fit + hyper-parameters search (if enabled)
model_result[f'{model.name}_id{model_num}'] = dsf_model.fit(x, y, eval_set=eval_set)
# prepare and save intermediate results
dsf_models[f'{model.name}_id{model_num}'] = dsf_model
models_cv_score[f'{model.name}_id{model_num}'] = dsf_model.sklearn_searcher.best_score_
cv_results = pd.DataFrame(dsf_model.sklearn_searcher.cv_results_)
cv_results['model'] = f'{model.name}_id{model_num}'
models_cv_results = pd.concat([models_cv_results, cv_results], ignore_index=True)
models_cv_results = models_cv_results.loc[:, ~
models_cv_results.columns.str.contains('param_')]
# prepare final results
models_cv_results.insert(0, 'model', models_cv_results.pop('model'))
models_cv_results = models_cv_results.drop('rank_test_score', axis=1)
best_dsf_model = max(models_cv_score, key=models_cv_score.get)
dsf_model = dsf_models[best_dsf_model]
# save final results
self.models_cv_score = models_cv_score
self.models_cv_results = models_cv_results
self.gscv_model = dsf_model.sklearn_searcher
self.current_model_params = dsf_model.sklearn_searcher.best_params_
self.model_result = model_result
logger.info(f'Best params: {self.current_model_params}')
# refit on best params
dsf_model.init_model(dsf_params, self.current_model_params)
dsf_model.fit(x_train,
y_train,
eval_set=eval_set)
self.native_model = dsf_model.model
self.dsf_model = dsf_model
return dsf_model
[docs] def predict(self, x=None, model=None, preproc_reverse=None):
"""
Function of predicting results using a trained model.
Parameters
----------
x: pd.DataFrame, optional
Data containing features on the basis of which it needs to make a prediction.
If the parameter is not specified, then data obtained at the previous steps of an experiment is taken:
`experiment.self.x_processed` (see `prepare_data` function above).
model: object (it depends on used model), optional
Trained model.
If the parameter is not specified, then model obtained at the previous step of an experiment is taken:
`experiment.dsf_model` (see `init_fit` function above).
preproc_reverse: bool, optional
Enable/disable reverse data processing. In case the value is True, if processing was applied to the target
values at the data preparation stage, then reverse processing is applied to the received predicted values.
Default value is True.
Returns
-------
result: pd.DataFrame
Model prediction data.
In addition, the results obtained are stored in the attributes of the experiment class:
experiment.prediction - the original prediction of the model
experiment.prediction_reverse_processed - prediction of the model after applying reverse processing, in
the case when `preproc_reverse = False`, the parameter value is None
"""
if model is None:
model = self.dsf_model
if x is None:
if self.x_test is not None and not self.x_test.empty:
x = self.x_test
else:
raise ValueError('The `predict()` function uses a test sample by default (`x = exp.x_test`), '
'but it seems to be empty. Please, adjust your splitters to fill test sample,'
'or specify the `x` parameter explicitly.')
if preproc_reverse is None:
preproc_reverse = self.prediction_params.get('preprocessing_reverse', True)
self.preproc_reverse = preproc_reverse
result = model.predict(x)
prediction = pd.DataFrame(result, index=x.index, columns=self.schema.target_columns)
self.prediction = prediction
# add column with values used for grouping (by symbol in most cases)
if self.schema.group_column_name is not None:
prediction = prediction.join(self.group_column)
# run reverse preprocessing if needed
if preproc_reverse:
# add utility columns to prediction, if they were used in reverse transformation
prediction = prediction.join(self.df[self.schema.utility_columns])
# run pipeline in reverse order
for preproc in self.pipeline[::-1]:
if hasattr(preproc, 'inverse_transform'):
prediction = preproc.inverse_transform(prediction_df=prediction)
self.prediction_reverse_processed = prediction.loc[:, self.schema.target_columns]
return self.prediction_reverse_processed
# or just return prediction without reverse preprocessing
return self.prediction
def _calc_metrics(self, y=None, prediction=None):
metrics = {}
for idx, column in enumerate(y.columns):
for evaluator in self.evaluators:
metric_value = evaluator.evaluate(y.iloc[:, idx], prediction.iloc[:, idx])
metrics[f'{column}_{evaluator.name}'] = metric_value
return metrics
[docs] def calc_metrics(self, y=None, prediction=None, group_by_column: bool = False):
"""
Function of calculating metrics.
Parameters
----------
y: pd.DataFrame, optional
Target data.
If the parameter is not specified, then data obtained at the previous steps of an experiment is taken:
`experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict`
function above)
`experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict`
function above)
Obviously, only the indices present in the prediction are selected from the target data.
If `group_by_column` is True, then `y` must contain the column specified in `Experiment.group_column_name`.
prediction: pd.DataFrame, optional
Model prediction.
If the parameter is not specified, then data obtained at the previous steps of an experiment is taken:
`experiment.prediction_reverse_processed` (see `prepare_data` function above) if `preproc_reverse = True`
(see `predict` function above)
`experiment.prediction` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict`
function above)
group_by_column: bool, optional
Apply grouping by column during metric calculation.
Each metric is calculated for each group separately.
If the parameter is False, then the data is not grouped.
Resulting metrics are returned as a dictionary of dictionaries.
Default value is False.
Returns
-------
result: dict or dict of dict
Calculated metric values in dict.
If `group_by_column` is True, then the result is a dictionary of dictionaries.
"""
if y is None:
y = self.y_test
if self.preproc_reverse:
y = self.y_unprocessed.loc[self.prediction_reverse_processed.index]
if prediction is None:
if self.preproc_reverse:
prediction = self.prediction_reverse_processed[self.schema.target_columns]
else:
prediction = self.prediction[self.schema.target_columns]
if prediction.empty or y.empty: # pragma: no cover
logger.warning('Sample is empty, there is no data for calculating metrics.')
return None
if group_by_column is False:
metrics = self._calc_metrics(y, prediction)
self.current_metrics = metrics
return metrics
else:
metrics = {}
_prediction = prediction
if self.schema.group_column_name not in prediction.columns:
if self.schema.group_column_name not in self.df:
_prediction = _prediction.join(y[self.schema.group_column_name], how='inner')
else:
_prediction = _prediction.join(self.group_column, how='inner')
for group, group_df in _prediction.groupby(self.schema.group_column_name):
_metrics = self._calc_metrics(
y=y.loc[group_df.index, ~y.columns.isin([self.schema.group_column_name])],
prediction=_prediction.loc[
group_df.index,
~_prediction.columns.isin([self.schema.group_column_name])
]
)
metrics[group] = _metrics
self.current_metrics = metrics
return metrics
[docs] def calc_baseline(self, y: pd.DataFrame = None, group_by_column: bool = False) -> dict:
"""
Function of building a baseline model and calculating its metrics.
The following model is used as a baseline:
the current predicted value of the time series is equal to the previous actual value of the time series.
Parameters
----------
y: pd.DataFrame, optional
Target data.
If the parameter is not specified, then data obtained at the previous steps of an experiment is taken:
`experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict`
function above)
`experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict`
function above)
Obviously, only the indices present in the prediction are selected from the target data.
group_by_column: bool, optional
Apply grouping by column. If the parameter is False, then the data is not grouped.
Each metric is calculated for each group separately.
Resulting metrics are returned as a dictionary of dictionaries.
Default value is False.
Returns
----------
result: dict
Calculated metric values of the baseline model.
If `group_by_column` is specified, then the result is a dictionary of dictionaries.
"""
if y is None:
if self.preproc_reverse:
y = self.y_unprocessed.loc[self.prediction_reverse_processed.index].copy()
y.columns = y.columns.str.replace('__UNPROCESSED.', '', regex=False)
else:
y = self.y_processed.loc[self.prediction.index]
if not group_by_column and not self.schema.group_column_name:
y_shifted = y.shift(1).dropna()
metrics = self._calc_metrics(y.loc[y_shifted.index], y_shifted)
return metrics
if self.schema.group_column_name in y.columns:
y_with_group = y
else:
y_with_group = y.join(self.group_column)
y_without_group = y_with_group.loc[:, ~y_with_group.columns.isin(
[self.schema.group_column_name])]
if not group_by_column:
y_shifted = y_with_group.groupby(self.schema.group_column_name).shift(1).dropna()
metrics = self._calc_metrics(y_without_group.loc[y_shifted.index], y_shifted)
else:
metrics = {}
for group, group_df in y_with_group.groupby(self.schema.group_column_name):
y_shifted = group_df.shift(1).dropna()
_metrics = self._calc_metrics(y_without_group.loc[y_shifted.index], y_shifted)
metrics[group] = _metrics
return metrics
[docs] def prediction_intervals_onestep(self,
y: Optional[pd.DataFrame] = None,
prediction: Optional[pd.DataFrame] = None,
z_value: float = 1.96) -> pd.DataFrame:
"""
Function of calculating one-step prediction interval using the standard deviation of the residuals.
See: https://otexts.com/fpp3/prediction-intervals.html#one-step-prediction-intervals
Parameters
----------
y: pd.DataFrame, optional
Target data.
If the parameter is not specified, then data obtained at the previous steps of an experiment is taken:
`experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict`
function above)
`experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict`
function above)
Obviously, only the indices present in the prediction are selected from the target data.
prediction: pd.DataFrame, optional
Model prediction.
If the parameter is not specified, then data obtained at the previous steps of an experiment is taken:
`experiment.prediction_reverse_processed` (see `prepare_data` function above) if `preproc_reverse = True`
(see `predict` function above)
`experiment.prediction` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict`
function above)
z_value: float, optional
Z value for confidence interval.
Default value is 1.96.
Returns
----------
result: pd.DataFrame
Calculated prediction intervals.
"""
if y is None:
if self.preproc_reverse:
y = self.y_unprocessed.loc[self.prediction_reverse_processed.index]
else:
y = self.y_processed.loc[self.prediction.index]
if prediction is None:
if self.preproc_reverse:
prediction = self.prediction_reverse_processed
else:
prediction = self.prediction
if prediction.empty or y.empty: # pragma: no cover
logger.warning('Sample is empty, there is no data for calculating metrics.')
return None
confidence = OneStepPredictionIntervals()
delta_vals = confidence.evaluate(y=y,
prediction=prediction,
z_value=z_value)
pi_df = pd.DataFrame()
for column in y.columns:
pi_df[f'{column}_LOWER_BORDER'] = prediction[column] - delta_vals[f'{column}_DELTA']
pi_df[f'{column}_UPPER_BORDER'] = prediction[column] + delta_vals[f'{column}_DELTA']
return pi_df
# TODO it needs to finalize BootstrapPredictionIntervals
def prediction_intervals_bootstrap(self, experiment=None): # pragma: no cover
if experiment is None:
experiment = self
confidence = BootstrapPredictionIntervals()
pi_df = confidence.evaluate(experiment=experiment)
return pi_df
[docs] def save_pyfunc_model(self, path: str):
""" Save model as pyfunc MLFlow model
Parameters
----------
path : str
Path to save model.
"""
native_model_path = os.path.join(path, "native_model")
pyfunc_model_path = os.path.join(path, "pyfunc_model")
model_signature = self.dsf_model.infer_signature(self.x_test, self.y_test)
self.dsf_model.mlflow_register.save_model(
self.dsf_model.get_mlflow_model(),
native_model_path,
signature=model_signature)
mlflow.pyfunc.save_model(
pyfunc_model_path,
python_model=self._pyfunc_wrapper(self.dsf_model),
artifacts={
self.dsf_model.model_name: native_model_path
}
)
[docs] def save_model(self, *args, **kwargs):
"""Save `Experiment.dsf_model` to disk, using native `save_model()` from used ML library.
Pass any `args` and `kwargs`, that native ML library consume.
"""
self.dsf_model.save_model(*args, **kwargs)
[docs] def load_model(self, dsf_model_class, *args, **kwargs):
"""Initialize and load model from disk, using native `load_model()` from used ML library.
Pass any `args` and `kwargs`, that native ML library consume.
Loaded model will be set to `dsf_model` attribute of current experiment instance.
Returns
-------
any
loaded model instance
"""
self.dsf_model = dsf_model_class
self.dsf_model.init_model()
model = self.dsf_model.load_model(*args, experiment=self, **kwargs)
self.dsf_model.model = model
return model
[docs] def run(self, x=None, y=None, remote_mode=False):
"""Run full experiment cycle, consisting of 3 stages:
1. Data load and preprocessing
2. Model training
3. Evaluation
Trained model could be found in `Experiment.dsf_model` attribute.
Method will return tuple, containing metrics and predictions calculated on test set,
if test_size is greater then 0, or on whole train set otherwise.
Parameters
----------
x : pd.DataFrame, optional
Input data, by default None
y : pd.DataFrame, optional
Target data, by default None
remote_mode : bool, optional
Flag, indicating if experiment should be run remotely on Ray, by default False
Returns
-------
tuple
metrics, predictions
"""
self.get_data()
self.prepare_data()
self.init_fit(remote_mode=remote_mode)
predictions = self.predict(x=x)
metrics = self.calc_metrics(y=y)
return metrics, predictions
[docs] def save_mlflow_run(self):
"""Saves last run to MLFlow and returns run ID.
Returns:
str: MLFlow run ID.
"""
self._start_mlflow_run(self.current_model_params)
self._mlflow_track_model(self.dsf_model)
self._mlflow_track_metrics(self.current_metrics)
if self.mlflow_url:
experiment_link = f"{self.mlflow_url}/#/experiments/{self.mlflow_run.info.experiment_id}"
run_link = f"{experiment_link}/runs/{self.mlflow_run.info.run_id}"
logger.info(f"MLFlow experiment link: {experiment_link}")
logger.info(f"MLFlow logged run link: {run_link}")
run_id = self.mlflow_run.info.run_id
self.mlflow_run = None
mlflow.end_run()
return run_id
def _start_mlflow_run(self, model_params):
if mlflow.active_run():
logger.warning(
"you've not calculated metrics last run, so metrics are not recorded in MLFlow.")
self.mlflow_run = None
mlflow.end_run()
if self.mlflow_url:
mlflow.set_tracking_uri(self.mlflow_url)
mlflow.set_experiment(self.experiment_name)
self.mlflow_run = mlflow.start_run()
mlflow.log_params(model_params)
mlflow.log_dict(self.serialize_config(), 'config.yaml')
mlflow.log_dict(self.datafeed_hashes, 'datahashes.yaml')
mlflow.set_tags({
"model_name": self.dsf_model.name,
"onetickml_version": onetick.ml.__version__,
})
def _pyfunc_wrapper(self, model):
"""Builds `_ModelWrapper` class for MLFlow to pass as `python_model=` param to `log_model()`.
Parameters
----------
model : any
Model class to wrap (passed only by MLFlow inner functions)
Returns
-------
mlflow.pyfunc.PythonModel
Class for wrapper based on pyfunc.PythonModel
"""
_datafeeds = self.datafeeds
_pipeline = self.pipeline
_dsf_model_class = self.dsf_model.__class__
_schema = self.schema
preproc_reverse = self.prediction_params.get('preprocessing_reverse', True)
_group_column_name = self.schema.group_column_name
class _ModelWrapper(mlflow.pyfunc.PythonModel, PreparePredictDataMixin):
datafeeds = _datafeeds
pipeline = _pipeline
dsf_model_class = _dsf_model_class
group_column_name = _group_column_name
schema = _schema
def load_context(self, context):
self.wrapped_model = model.mlflow_register.load_model(
model_uri=context.artifacts[model.model_name])
def load_data(self, parameters: Optional[dict] = None):
# use datafeed params as defaults
kwargs = self.datafeeds[0]._kwargs.copy()
if parameters:
kwargs.update(parameters)
# replace start/end with datetime objects if it's string (ex: from Airflow DAG)
for key in ["start", "end"]:
if key in kwargs and isinstance(kwargs[key], str):
kwargs[key] = datetime.datetime.fromisoformat(kwargs[key])
# init new datafeed instance and use it to load data
datafeed = self.datafeeds[0].__class__(**kwargs)
src = datafeed.load()
return datafeed.run(src).reset_index()
def predict(self, context, model_input):
""" Predict method for MLFlow pyfunc model.
Parameters
----------
context : mlflow.pyfunc.PythonModelContext
A PythonModelContext instance containing artifacts that the model can use to perform inference.
model_input : pd.DataFrame
A pyfunc-compatible input for the model to evaluate.
Returns
-------
pd.DataFrame
Prediction made by wrapped model.
Reverse processing applied with respect to Experiment params/
"""
_df = model_input.copy(deep=True)
x = self.prepare_predict_data(_df)
# model predict
dsf_model = self.dsf_model_class()
dsf_model.model = self.wrapped_model
result = dsf_model.predict(x)
prediction = pd.DataFrame(result, index=x.index, columns=self.schema.target_columns)
# reverse processing
if preproc_reverse:
for preproc in self.pipeline[::-1]:
if isinstance(preproc, BasePreprocess):
prediction = preproc.inverse_transform(prediction_df=prediction)
return prediction.loc[:, self.schema.target_columns]
return prediction
return _ModelWrapper()
def _mlflow_track_model(self, model):
if self.log_models and model.mlflow_register:
model_signature = model.infer_signature(self.x_test, self.y_test)
model_info = model.mlflow_register.log_model(
model.get_mlflow_model(),
artifact_path=model.model_name,
registered_model_name=model.model_name,
signature=model_signature)
mlflow.pyfunc.log_model(
f"wrapped_{model.model_name}",
registered_model_name=f"wrapped_{model.model_name}",
python_model=self._pyfunc_wrapper(model),
artifacts={
model.model_name: model_info.model_uri
}
)
def _mlflow_track_metrics(self, estimations):
if self._use_mlflow():
mlflow.log_metrics(estimations)
mlflow.end_run()
[docs] def load_mlflow_model(self, run_id: str):
"""Load model from MLFlow Registry from run having specified `run_id`.
Resulted model with be set to `dsf_model` attribute of current experiment instance.
Returns:
model: loaded model instance
"""
if self.mlflow_url:
mlflow.set_tracking_uri(self.mlflow_url)
client = MlflowClient()
run = client.get_run(run_id)
init_params = run.data.params
self.dsf_model = None
for model in self.models:
if model.model_name == run.data.tags['model_name']:
self.dsf_model = model
break
assert self.dsf_model, "Model with specified name not found in experiment models list"
# initialize dsf_model with params from run
overfitting_params = self.train_params.get('overfitting', {})
if overfitting_params is None:
overfitting_params = {}
general_train_params = {'loss': self.train_params.get('loss') or 'RMSE',
'verbose': self.train_params.get('verbose') or DEFAULT_VERBOSE}
dsf_params = {**overfitting_params, **general_train_params}
self.dsf_model.init_model(dsf_params, init_params)
# load model from MLFlow run
self.dsf_model.model = self.dsf_model.mlflow_register.load_model(
f"runs:/{run_id}/{self.dsf_model.name}")
return self.dsf_model