Source code for onetick.ml.impl.experiments.experiment
importosimportcopyimportrayimportjoblibimportrayimportmlflowimportdatetimeimportpandasaspdimportonetick.pyasotpfromfunctoolsimportreducefromtypingimportOptional,Unionfrommlflow.trackingimportMlflowClientfromray.util.joblibimportregister_rayfromsklearn.model_selectionimportGridSearchCV,RandomizedSearchCVimportonetick.mlfromonetick.ml.impl.data_pipelines.data_pipelineimportSelectFeatures,SelectTargetsfromonetick.ml.impl.data_pipelines.ot_pipelineimportToPandasfromonetick.ml.utils.schemaimportDataSchemafromonetick.ml.impl.evaluatorsimport(BootstrapPredictionIntervals,OneStepPredictionIntervals)fromonetick.ml.interfacesimportBaseExperimentfromonetick.ml.interfaces.data_pipelinesimportBaseFeatures,BaseFilter,BasePreprocessfromonetick.ml.utilsimportcreate_folds,logger,params_iteratorDEFAULT_VERBOSE=0classPreparePredictDataMixin:""" Mixin for experiments and wrapped model to prepare data for prediction. Need to have `pipeline` attribute in class. """defprepare_predict_data(self,df:pd.DataFrame):""" Prepare data for prediction only: - pipeline transformations is applied to data (without target columns and without fit) - return prepared data as pd.DataFrame with features only Parameters ---------- df : pd.DataFrame Plain data to prepare for prediction. Format is expected to be the same as comes from datafeed. Returns ------- x : pd.DataFrame Prepared data, suitable to be passed in `Experiment.predict()` method. """src=dfforprocinself.pipeline:ifisinstance(proc,str):self.schema.add_features(proc)continueproc.schema=self.schemaifproc.refit_on_predict:src=proc.fit(src)src=proc.transform(src)x=src.loc[:,self.schema.features_columns]returnx
[docs]classExperiment(BaseExperiment,PreparePredictDataMixin):""" Implements full cycle to run experiment. """general=dict(experiment_name='default_experiment',log_models=False,mlflow_url=None,)datafeeds=[]splitters=[]pipeline=[]models=[]evaluators=[]features_columns=[]target_columns=[]group_column_name=None# train paramstrain_params=dict(loss='RMSE',verbose=1,overfitting={},search_cv={},n_jobs=-1,)# prediction and evaluation paramsprediction_params={'preprocessing_reverse':True}def__init__(self):# assert self.datafeeds, "datafeed must be defined to run experiment"# assert self.splitters, "splitters must be defined to run experiment"self.df=pd.DataFrame()self.src=Noneself.pipeline_query=Noneself.prediction,self.prediction_reverse_processed=None,Noneself._prediction_input=Noneself.mlflow_run=Noneself.results={}self.dsf_model=Noneself.gscv_model=Noneself.native_model=Noneself.models_cv_score=Noneself.models_cv_results=Noneself.current_model_params=Noneself.model_signature=Noneself.model_result=Noneself.preproc_reverse=Noneself.current_metrics=Noneself.datafeed_hashes={}self.restored_datafeed_hashes=Noneself.general:dict={}ifself.generalisNoneelseself.generalself.train_params:dict={}ifself.train_paramsisNoneelseself.train_paramsself.experiment_name:str=self.general.get('experiment_name','default_experiment')self.log_models=self.general.get('log_models',False)self.mlflow_url=self.general.get('mlflow_url',None)self.wrapped_model=Noneself.schema=DataSchema()self.schema.group_column_name=self.group_column_nameifnotself.features_columns:raiseRuntimeError("No features columns selected. ""List features columns in Experiment.features_columns attribute.")ifnotself.target_columns:raiseRuntimeError("No target columns selected. ""List target columns in Experiment.target_columns attribute.")def_use_mlflow(self)->bool:"""Boolean showing are able to use MLFlow."""returnbool(self.mlflow_urlandself.experiment_name)
[docs]defserialize_config(self):"""Dump current instance config to `dict`, that could be used to reconstruct class by `build_experiment_class()`. """config={}forattrin['general','group_column_name','train_params','features_columns','target_columns']:config[attr]=getattr(self,attr)forattrin['datafeeds','splitters','pipeline',"models","evaluators"]:obj_list=[]forobjingetattr(self,attr):obj_cfg=copy.deepcopy(obj._init_params)obj_list.append(obj_cfg)config[attr]=obj_listconfig['prediction']={'preprocessing_reverse':self.prediction_params['preprocessing_reverse'],}returnconfig
# TODO Should we try always work with dataframes with Time column as index column?
[docs]defget_data(self,datetime_column:Optional[str]=None)->Union[pd.DataFrame,otp.Source]:""" Function of loading external data (e.g. onetick or csv). If an experiment is set to use of several data feeds, then they are merged by index or by the `datetime_column` (if it is specified). Before returning the result, `reset_index` is applied to the dataframe, since in the current version of the framework, preprocessing is based on a numerical index, and not on a datetime field. Parameters ---------- datetime_column: str, optional The field by which the data is merged if the experiment contains several data feeds. Returns ---------- result: pd.DataFrame Dataframe with data that will participate in the subsequent stages of the experiment pipeline (`prepare_data()` function). Important! `reset_index()` was applied to the data (see the description of the function above). The result is also stored in the attribute of an instance of the class Experiment: experiment.df """sources=[]fordatafeedinself.datafeeds:datafeed.schema=self.schemasrc=datafeed.load()sources.append(src)iflen(sources)>1andisinstance(sources[0],otp.Source):src=otp.merge(sources)eliflen(sources)>1andisinstance(sources[0],pd.DataFrame):on_column=Noneleft_index_column=Falseright_index_column=Falseiflen(sources)>1:ifdatetime_columnisNone:left_index_column=Trueright_index_column=Trueelse:on_column=datetime_columnsrc=reduce(lambdaleft,right:pd.merge(left,right,on=on_column,left_index=left_index_column,right_index=right_index_column),sources)eliflen(sources)<1:raiseValueError("No datafeeds were specified")ifisinstance(src,pd.DataFrame):src=src.reset_index()self.src=srcreturnsrc
[docs]defprepare_data(self,src:Optional[Union[pd.DataFrame,otp.Source]]=None):""" Function of preparing data. It includes following steps: features calculation, sample splitting, and data preprocessing. Parameters ---------- src: pd.DataFrame or otp.Source, optional Data to prepare for feeding into a machine learning model. If the parameter is not specified, then data obtained at the previous step of an experiment is taken (the `experiment.df` attribute defined at the `get_data()` function execution stage). Results ---------- The results are stored in the Experiment properties, which is pandas slices from full dataframe: experiment.x_train experiment.x_val experiment.x_test experiment.y_train experiment.y_val experiment.y_test In addition, you can refer to the following attributes: experiment.y_unprocessed - dataframe with not preprocessed (original) target columns, used to calculate metrics """ifsrcisNone:# TODO copy is needed only for case with double call of prepare_data (is it still needed?)src=self.src.copy(deep=True)# source type defines context for schema and pipelineifisinstance(src,pd.DataFrame):self.schema.context='pandas'elifisinstance(src,otp.Source):self.schema.context='onetick'# share schema object between all operatorsforoperatorinself.datafeeds+self.splitters+self.pipeline:ifisinstance(operator,str):continueoperator.schema=self.schema# split data or mark all as trainifself.splitters[0]:src=self.splitters[0].transform(src)else:src["__SPLIT"]="TRAIN"# copy unprocessed target columns to separate columns with prefix "__UNPROCESSED.",# to save original values before running processing pipeline.# this values will be needed to calculate metrics on reverse preprocessed data.self.schema.unprocessed_target_columns=[]op=SelectTargets(self.target_columns)op.schema=self.schemasrc=op.fit_transform(src)# run the whole processing pipelineforprocinself.pipeline:# treat strings as feature namesifisinstance(proc,str):self.schema.add_features(proc)continue# treat ToPandas() as context switch from onetick to pandaselifisinstance(proc,ToPandas):ifself.schema.context=="pandas":raiseValueError("ToPandas() operator cannot be used in pandas context")self.schema.context="pandas"self.pipeline_query=srcsrc=self.datafeeds[0].run(src)continue# run operatorsrc=proc.fit(src)src=proc.transform(src)# # track how target columns names are changed,# # to be able to calculate metrics on reverse preprocessed data# proc.update_target_columns()# check if dataframe is empty, and raise error if it is# useful for debugging pipelinesifself.schema.context=="pandas"andsrc.empty:raiseValueError(f"DataFrame become empty on operator: {proc}")# TODO rewrite this part to simplify calling of operatorop=SelectFeatures(self.features_columns)op.schema=self.schemasrc=op.fit_transform(src)logger.info(f'schema.target_columns: {self.schema.target_columns}')logger.info(f'schema.features_columns: {self.schema.features_columns}')# save resulted dataframe to experiment objectifself.schema.context=="pandas":self.df=srcelifself.schema.context=="onetick":self.pipeline_query=srcself.df=self.datafeeds[0].run(src)else:raiseValueError(f"Unknown data context: {self.schema.context}")ifself.df.empty:raiseValueError("Dataframe is empty")forcolumninself.schema.target_columns+self.schema.features_columns:assertcolumninself.df.columns,f'Column {column} is missing in dataframe after pipeline execution'# save hashes of datafeedsfordin['x_train','y_train','x_test','y_test','x_val','y_val','y_unprocessed']:self.datafeed_hashes[d]=joblib.hash(getattr(self,d))# check if datafeeds are changed since experiment was saved in MLFlowifself.restored_datafeed_hashesisnotNone:ifself.restored_datafeed_hashes!=self.datafeed_hashes:logger.warning("Current datafeed hashes are not equal with experiment saved in MLFlow")
@propertydefx_train(self):returnself.df.loc[self.df['__SPLIT']=='TRAIN',self.schema.features_columns]@propertydefy_train(self):returnself.df.loc[self.df['__SPLIT']=='TRAIN',self.schema.target_columns]@propertydefx_test(self):returnself.df.loc[self.df['__SPLIT']=='TEST',self.schema.features_columns]@propertydefy_test(self):returnself.df.loc[self.df['__SPLIT']=='TEST',self.schema.target_columns]@propertydefx_val(self):returnself.df.loc[self.df['__SPLIT']=='VAL',self.schema.features_columns]@propertydefy_val(self):returnself.df.loc[self.df['__SPLIT']=='VAL',self.schema.target_columns]@propertydefgroup_column(self):returnself.df.loc[:,[self.schema.group_column_name]]@propertydefx_processed(self):returnself.df.loc[:,self.schema.features_columns]@propertydefy_processed(self):returnself.df.loc[:,self.schema.target_columns]@propertydefy_unprocessed(self):tmp_df=self.df.loc[self.df['__SPLIT']=='TEST',self.schema.unprocessed_target_columns]tmp_df=tmp_df.rename(columns={col:col.replace('__UNPROCESSED.','')forcolintmp_df.columns})returntmp_df# TODO Implement features_importance funcdeffeatures_importance(self,x_train=None,x_val=None,y_train=None,y_val=None,method='catboost_fi'):# pragma: no coverpass
[docs]definit_fit(self,train_params:Union[dict,None]=None,remote_mode:bool=False):""" Function of training models. It includes the model initialization and model fitting steps. In addition, at this stage, the logic of overfitting control, validation (including cross-validation), and hyperparameter tuning is implemented. Parameters ---------- train_params: dict, optional loss: str, optional One of {'MAE', 'RMSE', 'MAPE', 'MSLE'}. The loss function that is used to train the model. Default value is 'RMSE'. Important! The loss function can also be set in the `init_params` of the model, in which case it overrides the `loss` parameter. overfitting: dict, optional Parameters to limit the overfitting of the model during training. It includes: eval_metric: str One of {'MAE', 'RMSE', 'MAPE', 'R2', 'MSLE'}. The metric calculated on the validation set to control overfitting (determining the best iteration of model training, early stopping of model training). Default value is 'MAE'. early_stopping_rounds: int The maximum number of training iterations that occurs without improving the quality of model prediction on the validation sample. If the `eval_metric` does not improve during the `early_stopping_rounds` iterations, then the model training is completed. If `early_stopping_rounds` is not set or equal to zero, then early stopping is disabled and the model will be trained for the full number of iterations. Default value is 0. search_cv: dict, optional Validation and hyperparameter tuning settings. It includes: val_type: str The type of validation, one of {'Simple', 'Cross', 'WalkForward'}. Default is 'Simple'. folds: int The number of folds (used only for 'Cross' and 'WalkForward' validation). Default is 5. eval_metric: str One of {'MAE', 'RMSE', 'MAPE', 'R2', 'MSLE'}. The metric calculated on validation samples in order to determine the best set of hyperparameters when tuning them. In addition, when using cross and walk-forward validation, metric calculation results can be obtained using the `experiment.gscv_model.cv_results_` attribute (for example, to estimate the error variance). Default is 'MAE'. search_optimization: str Hyperparameter search method, one of {'grid', 'random', 'bayesian', 'bohb', 'hyperopt', 'none'} If value is `'none'`, then only first params combination used to train model. Default is 'grid'. n_trials: int Number of search iterations, used for 'random', 'bayesian', 'bohb', 'hyperopt' types. Default is 10. remote_mode: bool, optional If True, then using joblib with ray backand to parallelize processes on remote Ray workers|cluster. If False, then using joblib with threading backend to parallelize processes locally. Default value is False. Returns ------- result: onetick.ml.impl.models.RegressorModel Dataframe with data that will participate in the subsequent stages of the experiment pipeline. RegressorModel attributes: experiment.current_model_params: dict the best model parameters found as a result of tuning hyperparameters (when tuning is disabled, the first element from the list of values is selected for each hyperparameter) experiment.gscv_model: GridSearchCV/RandomizedSearchCV Scikit-learn GridSearchCV/RandomizedSearchCV model object experiment.model_result: object result of GridSearchCV/RandomizedSearchCV model `fit()` function experiment.dsf_model: object DS Framework model object, the `model` attribute of this object stores the original trained ML model (e.g. XGBoost or sklearn model). """assertself.models,"Experiment.models must be defined to run .init_fit()"tune_metric_map={'R2':'r2','MAE':'neg_mean_absolute_error','RMSE':'neg_root_mean_squared_error','MSLE':'neg_mean_squared_log_error','MAPE':'neg_mean_absolute_percentage_error'}train_params=self.train_paramsiftrain_paramsisNoneelsetrain_paramsgeneral_train_params={'loss':train_params.get('loss')or'RMSE','verbose':train_params.get('verbose',DEFAULT_VERBOSE)}overfitting_params=train_params.get('overfitting')or{}search_cv_params:dict=train_params.get('search_cv')or{}search_optimization=search_cv_params.get('search_optimization','grid')scoring=tune_metric_map[search_cv_params.get('eval_metric','MAE')]val_type=search_cv_params.get('val_type','Simple')x_train=self.x_trainy_train=self.y_trainx_val=self.x_valy_val=self.y_valis_val_set_empty=Falseifx_val.emptyory_val.empty:# pragma: no coverlogger.warning('Validation sample is empty, so validation is disabled.')is_val_set_empty=Truedsf_models={}models_cv_score={}models_cv_results=pd.DataFrame()model_result={}formodel_num,modelinenumerate(self.models):init_params_grid=model.init_paramsor{}model.is_val_set_empty=is_val_set_emptyifsearch_optimization=='none':model_params=list(params_iterator(init_params_grid))[0]init_params_grid={k:[v]fork,vinmodel_params.items()}logger.warning(f'Optimization is disabled, the first combination of parameters are used: {model_params}')# init modeldsf_params={**overfitting_params,**general_train_params}dsf_model=modeldsf_model.init_model(dsf_params,init_params_grid)# todo it needs to change simple validationcv=create_folds(x_train=x_train,x_val=x_val,y_val=y_val,# val_size=self.splitters[0].val_size,# test_size=self.splitters[0].test_size,val_type=val_type,folds_num=search_cv_params.get('folds',5))# TODO turn off refit for None and Simple??# init *SearchCVlogger.info(f"Training model: {model.name}. <dsf_params>: {dsf_params} "f"<init_params>: {init_params_grid} <fit_params>:{model.fit_params}")sklearn_params=dict(# early_stopping=search_cv_params['early_stopping'],scoring=scoring,cv=cv,refit=False,verbose=self.train_params.get('verbose',DEFAULT_VERBOSE),)ifsearch_optimizationin['grid','none']:dsf_model.sklearn_searcher=GridSearchCV(dsf_model.model,init_params_grid,**sklearn_params)elifsearch_optimization=='random':sklearn_params['n_iter']=search_cv_params.get('n_trials',10)dsf_model.sklearn_searcher=RandomizedSearchCV(dsf_model.model,init_params_grid,**sklearn_params)else:raiseException(f"'{search_optimization}' search_optimization is currently not supported, ""please use 'grid', 'random' or 'none'")eval_set=Noneifis_val_set_emptyelse[(x_val,y_val)]joblib_backend_name="threading"ifremote_mode:ifray.is_initialized():joblib_backend_name="ray"register_ray()else:raiseException("Ray is not initialized, please use ray.init() to use remote_mode=True")ifval_type=='Simple':x=pd.concat([x_train,x_val],axis=0)y=pd.concat([y_train,y_val],axis=0)else:x=x_trainy=y_trainwithjoblib.parallel_backend(joblib_backend_name,n_jobs=self.train_params.get("n_jobs",-1)):# model fit + hyper-parameters search (if enabled)model_result[f'{model.name}_id{model_num}']=dsf_model.fit(x,y,eval_set=eval_set)# prepare and save intermediate resultsdsf_models[f'{model.name}_id{model_num}']=dsf_modelmodels_cv_score[f'{model.name}_id{model_num}']=dsf_model.sklearn_searcher.best_score_cv_results=pd.DataFrame(dsf_model.sklearn_searcher.cv_results_)cv_results['model']=f'{model.name}_id{model_num}'models_cv_results=pd.concat([models_cv_results,cv_results],ignore_index=True)models_cv_results=models_cv_results.loc[:,~models_cv_results.columns.str.contains('param_')]# prepare final resultsmodels_cv_results.insert(0,'model',models_cv_results.pop('model'))models_cv_results=models_cv_results.drop('rank_test_score',axis=1)best_dsf_model=max(models_cv_score,key=models_cv_score.get)dsf_model=dsf_models[best_dsf_model]# save final resultsself.models_cv_score=models_cv_scoreself.models_cv_results=models_cv_resultsself.gscv_model=dsf_model.sklearn_searcherself.current_model_params=dsf_model.sklearn_searcher.best_params_self.model_result=model_resultlogger.info(f'Best params: {self.current_model_params}')# refit on best paramsdsf_model.init_model(dsf_params,self.current_model_params)dsf_model.fit(x_train,y_train,eval_set=eval_set)self.native_model=dsf_model.modelself.dsf_model=dsf_modelreturndsf_model
[docs]defpredict(self,x=None,model=None,preproc_reverse=None):""" Function of predicting results using a trained model. Parameters ---------- x: pd.DataFrame, optional Data containing features on the basis of which it needs to make a prediction. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.self.x_processed` (see `prepare_data` function above). model: object (it depends on used model), optional Trained model. If the parameter is not specified, then model obtained at the previous step of an experiment is taken: `experiment.dsf_model` (see `init_fit` function above). preproc_reverse: bool, optional Enable/disable reverse data processing. In case the value is True, if processing was applied to the target values at the data preparation stage, then reverse processing is applied to the received predicted values. Default value is True. Returns ------- result: pd.DataFrame Model prediction data. In addition, the results obtained are stored in the attributes of the experiment class: experiment.prediction - the original prediction of the model experiment.prediction_reverse_processed - prediction of the model after applying reverse processing, in the case when `preproc_reverse = False`, the parameter value is None """ifmodelisNone:model=self.dsf_modelifxisNone:ifself.x_testisnotNoneandnotself.x_test.empty:x=self.x_testelse:raiseValueError('The `predict()` function uses a test sample by default (`x = exp.x_test`), ''but it seems to be empty. Please, adjust your splitters to fill test sample,''or specify the `x` parameter explicitly.')ifpreproc_reverseisNone:preproc_reverse=self.prediction_params.get('preprocessing_reverse',True)self.preproc_reverse=preproc_reverseresult=model.predict(x)prediction=pd.DataFrame(result,index=x.index,columns=self.schema.target_columns)self.prediction=prediction# add column with values used for grouping (by symbol in most cases)ifself.schema.group_column_nameisnotNone:prediction=prediction.join(self.group_column)# run reverse preprocessing if neededifpreproc_reverse:# add utility columns to prediction, if they were used in reverse transformationprediction=prediction.join(self.df[self.schema.utility_columns])# run pipeline in reverse orderforpreprocinself.pipeline[::-1]:ifhasattr(preproc,'inverse_transform'):prediction=preproc.inverse_transform(prediction_df=prediction)self.prediction_reverse_processed=prediction.loc[:,self.schema.target_columns]returnself.prediction_reverse_processed# or just return prediction without reverse preprocessingreturnself.prediction
[docs]defcalc_metrics(self,y=None,prediction=None,group_by_column:bool=False):""" Function of calculating metrics. Parameters ---------- y: pd.DataFrame, optional Target data. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) Obviously, only the indices present in the prediction are selected from the target data. If `group_by_column` is True, then `y` must contain the column specified in `Experiment.group_column_name`. prediction: pd.DataFrame, optional Model prediction. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.prediction_reverse_processed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.prediction` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) group_by_column: bool, optional Apply grouping by column during metric calculation. Each metric is calculated for each group separately. If the parameter is False, then the data is not grouped. Resulting metrics are returned as a dictionary of dictionaries. Default value is False. Returns ------- result: dict or dict of dict Calculated metric values in dict. If `group_by_column` is True, then the result is a dictionary of dictionaries. """ifyisNone:y=self.y_testifself.preproc_reverse:y=self.y_unprocessed.loc[self.prediction_reverse_processed.index]ifpredictionisNone:ifself.preproc_reverse:prediction=self.prediction_reverse_processed[self.schema.target_columns]else:prediction=self.prediction[self.schema.target_columns]ifprediction.emptyory.empty:# pragma: no coverlogger.warning('Sample is empty, there is no data for calculating metrics.')returnNoneifgroup_by_columnisFalse:metrics=self._calc_metrics(y,prediction)self.current_metrics=metricsreturnmetricselse:metrics={}_prediction=predictionifself.schema.group_column_namenotinprediction.columns:ifself.schema.group_column_namenotinself.df:_prediction=_prediction.join(y[self.schema.group_column_name],how='inner')else:_prediction=_prediction.join(self.group_column,how='inner')forgroup,group_dfin_prediction.groupby(self.schema.group_column_name):_metrics=self._calc_metrics(y=y.loc[group_df.index,~y.columns.isin([self.schema.group_column_name])],prediction=_prediction.loc[group_df.index,~_prediction.columns.isin([self.schema.group_column_name])])metrics[group]=_metricsself.current_metrics=metricsreturnmetrics
[docs]defcalc_baseline(self,y:pd.DataFrame=None,group_by_column:bool=False)->dict:""" Function of building a baseline model and calculating its metrics. The following model is used as a baseline: the current predicted value of the time series is equal to the previous actual value of the time series. Parameters ---------- y: pd.DataFrame, optional Target data. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) Obviously, only the indices present in the prediction are selected from the target data. group_by_column: bool, optional Apply grouping by column. If the parameter is False, then the data is not grouped. Each metric is calculated for each group separately. Resulting metrics are returned as a dictionary of dictionaries. Default value is False. Returns ---------- result: dict Calculated metric values of the baseline model. If `group_by_column` is specified, then the result is a dictionary of dictionaries. """ifyisNone:ifself.preproc_reverse:y=self.y_unprocessed.loc[self.prediction_reverse_processed.index].copy()y.columns=y.columns.str.replace('__UNPROCESSED.','',regex=False)else:y=self.y_processed.loc[self.prediction.index]ifnotgroup_by_columnandnotself.schema.group_column_name:y_shifted=y.shift(1).dropna()metrics=self._calc_metrics(y.loc[y_shifted.index],y_shifted)returnmetricsifself.schema.group_column_nameiny.columns:y_with_group=yelse:y_with_group=y.join(self.group_column)y_without_group=y_with_group.loc[:,~y_with_group.columns.isin([self.schema.group_column_name])]ifnotgroup_by_column:y_shifted=y_with_group.groupby(self.schema.group_column_name).shift(1).dropna()metrics=self._calc_metrics(y_without_group.loc[y_shifted.index],y_shifted)else:metrics={}forgroup,group_dfiny_with_group.groupby(self.schema.group_column_name):y_shifted=group_df.shift(1).dropna()_metrics=self._calc_metrics(y_without_group.loc[y_shifted.index],y_shifted)metrics[group]=_metricsreturnmetrics
[docs]defprediction_intervals_onestep(self,y:Optional[pd.DataFrame]=None,prediction:Optional[pd.DataFrame]=None,z_value:float=1.96)->pd.DataFrame:""" Function of calculating one-step prediction interval using the standard deviation of the residuals. See: https://otexts.com/fpp3/prediction-intervals.html#one-step-prediction-intervals Parameters ---------- y: pd.DataFrame, optional Target data. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.y_unprocessed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.y_processed` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) Obviously, only the indices present in the prediction are selected from the target data. prediction: pd.DataFrame, optional Model prediction. If the parameter is not specified, then data obtained at the previous steps of an experiment is taken: `experiment.prediction_reverse_processed` (see `prepare_data` function above) if `preproc_reverse = True` (see `predict` function above) `experiment.prediction` (see `prepare_data` function above) if `preproc_reverse = False` (see `predict` function above) z_value: float, optional Z value for confidence interval. Default value is 1.96. Returns ---------- result: pd.DataFrame Calculated prediction intervals. """ifyisNone:ifself.preproc_reverse:y=self.y_unprocessed.loc[self.prediction_reverse_processed.index]else:y=self.y_processed.loc[self.prediction.index]ifpredictionisNone:ifself.preproc_reverse:prediction=self.prediction_reverse_processedelse:prediction=self.predictionifprediction.emptyory.empty:# pragma: no coverlogger.warning('Sample is empty, there is no data for calculating metrics.')returnNoneconfidence=OneStepPredictionIntervals()delta_vals=confidence.evaluate(y=y,prediction=prediction,z_value=z_value)pi_df=pd.DataFrame()forcolumniny.columns:pi_df[f'{column}_LOWER_BORDER']=prediction[column]-delta_vals[f'{column}_DELTA']pi_df[f'{column}_UPPER_BORDER']=prediction[column]+delta_vals[f'{column}_DELTA']returnpi_df
# TODO it needs to finalize BootstrapPredictionIntervalsdefprediction_intervals_bootstrap(self,experiment=None):# pragma: no coverifexperimentisNone:experiment=selfconfidence=BootstrapPredictionIntervals()pi_df=confidence.evaluate(experiment=experiment)returnpi_df
[docs]defsave_pyfunc_model(self,path:str):""" Save model as pyfunc MLFlow model Parameters ---------- path : str Path to save model. """native_model_path=os.path.join(path,"native_model")pyfunc_model_path=os.path.join(path,"pyfunc_model")model_signature=self.dsf_model.infer_signature(self.x_test,self.y_test)self.dsf_model.mlflow_register.save_model(self.dsf_model.get_mlflow_model(),native_model_path,signature=model_signature)mlflow.pyfunc.save_model(pyfunc_model_path,python_model=self._pyfunc_wrapper(self.dsf_model),artifacts={self.dsf_model.model_name:native_model_path})
[docs]defsave_model(self,*args,**kwargs):"""Save `Experiment.dsf_model` to disk, using native `save_model()` from used ML library. Pass any `args` and `kwargs`, that native ML library consume. """self.dsf_model.save_model(*args,**kwargs)
[docs]defload_model(self,dsf_model_class,*args,**kwargs):"""Initialize and load model from disk, using native `load_model()` from used ML library. Pass any `args` and `kwargs`, that native ML library consume. Loaded model will be set to `dsf_model` attribute of current experiment instance. Returns ------- any loaded model instance """self.dsf_model=dsf_model_classself.dsf_model.init_model()model=self.dsf_model.load_model(*args,experiment=self,**kwargs)self.dsf_model.model=modelreturnmodel
[docs]defrun(self,x=None,y=None,remote_mode=False):"""Run full experiment cycle, consisting of 3 stages: 1. Data load and preprocessing 2. Model training 3. Evaluation Trained model could be found in `Experiment.dsf_model` attribute. Method will return tuple, containing metrics and predictions calculated on test set, if test_size is greater then 0, or on whole train set otherwise. Parameters ---------- x : pd.DataFrame, optional Input data, by default None y : pd.DataFrame, optional Target data, by default None remote_mode : bool, optional Flag, indicating if experiment should be run remotely on Ray, by default False Returns ------- tuple metrics, predictions """self.get_data()self.prepare_data()self.init_fit(remote_mode=remote_mode)predictions=self.predict(x=x)metrics=self.calc_metrics(y=y)returnmetrics,predictions
[docs]defsave_mlflow_run(self):"""Saves last run to MLFlow and returns run ID. Returns: str: MLFlow run ID. """self._start_mlflow_run(self.current_model_params)self._mlflow_track_model(self.dsf_model)self._mlflow_track_metrics(self.current_metrics)ifself.mlflow_url:experiment_link=f"{self.mlflow_url}/#/experiments/{self.mlflow_run.info.experiment_id}"run_link=f"{experiment_link}/runs/{self.mlflow_run.info.run_id}"logger.info(f"MLFlow experiment link: {experiment_link}")logger.info(f"MLFlow logged run link: {run_link}")run_id=self.mlflow_run.info.run_idself.mlflow_run=Nonemlflow.end_run()returnrun_id
def_start_mlflow_run(self,model_params):ifmlflow.active_run():logger.warning("you've not calculated metrics last run, so metrics are not recorded in MLFlow.")self.mlflow_run=Nonemlflow.end_run()ifself.mlflow_url:mlflow.set_tracking_uri(self.mlflow_url)mlflow.set_experiment(self.experiment_name)self.mlflow_run=mlflow.start_run()mlflow.log_params(model_params)mlflow.log_dict(self.serialize_config(),'config.yaml')mlflow.log_dict(self.datafeed_hashes,'datahashes.yaml')mlflow.set_tags({"model_name":self.dsf_model.name,"onetickml_version":onetick.ml.__version__,})def_pyfunc_wrapper(self,model):"""Builds `_ModelWrapper` class for MLFlow to pass as `python_model=` param to `log_model()`. Parameters ---------- model : any Model class to wrap (passed only by MLFlow inner functions) Returns ------- mlflow.pyfunc.PythonModel Class for wrapper based on pyfunc.PythonModel """_datafeeds=self.datafeeds_pipeline=self.pipeline_dsf_model_class=self.dsf_model.__class___schema=self.schemapreproc_reverse=self.prediction_params.get('preprocessing_reverse',True)_group_column_name=self.schema.group_column_nameclass_ModelWrapper(mlflow.pyfunc.PythonModel,PreparePredictDataMixin):datafeeds=_datafeedspipeline=_pipelinedsf_model_class=_dsf_model_classgroup_column_name=_group_column_nameschema=_schemadefload_context(self,context):self.wrapped_model=model.mlflow_register.load_model(model_uri=context.artifacts[model.model_name])defload_data(self,parameters:Optional[dict]=None):# use datafeed params as defaultskwargs=self.datafeeds[0]._kwargs.copy()ifparameters:kwargs.update(parameters)# replace start/end with datetime objects if it's string (ex: from Airflow DAG)forkeyin["start","end"]:ifkeyinkwargsandisinstance(kwargs[key],str):kwargs[key]=datetime.datetime.fromisoformat(kwargs[key])# init new datafeed instance and use it to load datadatafeed=self.datafeeds[0].__class__(**kwargs)src=datafeed.load()returndatafeed.run(src).reset_index()defpredict(self,context,model_input):""" Predict method for MLFlow pyfunc model. Parameters ---------- context : mlflow.pyfunc.PythonModelContext A PythonModelContext instance containing artifacts that the model can use to perform inference. model_input : pd.DataFrame A pyfunc-compatible input for the model to evaluate. Returns ------- pd.DataFrame Prediction made by wrapped model. Reverse processing applied with respect to Experiment params/ """_df=model_input.copy(deep=True)x=self.prepare_predict_data(_df)# model predictdsf_model=self.dsf_model_class()dsf_model.model=self.wrapped_modelresult=dsf_model.predict(x)prediction=pd.DataFrame(result,index=x.index,columns=self.schema.target_columns)# reverse processingifpreproc_reverse:forpreprocinself.pipeline[::-1]:ifisinstance(preproc,BasePreprocess):prediction=preproc.inverse_transform(prediction_df=prediction)returnprediction.loc[:,self.schema.target_columns]returnpredictionreturn_ModelWrapper()def_mlflow_track_model(self,model):ifself.log_modelsandmodel.mlflow_register:model_signature=model.infer_signature(self.x_test,self.y_test)model_info=model.mlflow_register.log_model(model.get_mlflow_model(),artifact_path=model.model_name,registered_model_name=model.model_name,signature=model_signature)mlflow.pyfunc.log_model(f"wrapped_{model.model_name}",registered_model_name=f"wrapped_{model.model_name}",python_model=self._pyfunc_wrapper(model),artifacts={model.model_name:model_info.model_uri})def_mlflow_track_metrics(self,estimations):ifself._use_mlflow():mlflow.log_metrics(estimations)mlflow.end_run()
[docs]defload_mlflow_model(self,run_id:str):"""Load model from MLFlow Registry from run having specified `run_id`. Resulted model with be set to `dsf_model` attribute of current experiment instance. Returns: model: loaded model instance """ifself.mlflow_url:mlflow.set_tracking_uri(self.mlflow_url)client=MlflowClient()run=client.get_run(run_id)init_params=run.data.paramsself.dsf_model=Noneformodelinself.models:ifmodel.model_name==run.data.tags['model_name']:self.dsf_model=modelbreakassertself.dsf_model,"Model with specified name not found in experiment models list"# initialize dsf_model with params from runoverfitting_params=self.train_params.get('overfitting',{})ifoverfitting_paramsisNone:overfitting_params={}general_train_params={'loss':self.train_params.get('loss')or'RMSE','verbose':self.train_params.get('verbose')orDEFAULT_VERBOSE}dsf_params={**overfitting_params,**general_train_params}self.dsf_model.init_model(dsf_params,init_params)# load model from MLFlow runself.dsf_model.model=self.dsf_model.mlflow_register.load_model(f"runs:/{run_id}/{self.dsf_model.name}")returnself.dsf_model