import tempfile
import yaml
from typing import Union, Literal, Optional
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.model_selection import KFold, TimeSeriesSplit, train_test_split
from onetick.ml.utils.builder import ExperimentBuilder
def combinations(opt_params: list):
if not opt_params:
return [0]
params_list = [[]]
params_list[0] = opt_params[0]
for n in range(len(opt_params) - 1):
params_list.append([])
for el1 in params_list[n]:
for el2 in opt_params[n + 1]:
if n == 0:
params_list[n + 1].append([el1, el2])
else:
temp = el1.copy()
temp.append(el2)
params_list[n + 1].append(temp)
res = params_list[len(params_list) - 1]
return res if type(res[0]) == list else [res]
[docs]def params_iterator(model_params):
"""Iterate over grid returning combinations.
Parameters
----------
model_params : dict
Grid of parameters to iterate over
Yields
------
dict
One combination of parameters from grid.
"""
param_names = list(model_params.keys())
param_vals_list = combinations(list(model_params.values()))
for param_vals in param_vals_list:
current_params = dict(zip(param_names, param_vals)) if param_names else {}
yield current_params
[docs]def build_experiment_class(config: Union[dict, str] = {}, globals_dict: dict = {}):
"""Build `onetick.ml.Experiment` class from `dict`-config
Parameters
----------
config : dict
Configuration `dict` with settings tree
globals_dict : dict, optional
Supply `globals()` dict, to restore custom classes. Defaults to {}.
Returns
-------
any
`onetick.ml.Experiment` class builded from config `dict`
"""
builder = ExperimentBuilder(config, globals_dict)
_class = builder.build_experiment_class()
return _class
[docs]def build_experiment(config: Union[dict, str] = {}, globals_dict: dict = {}):
"""Build `onetick.ml.Experiment` instance from `dict`-config
Parameters
----------
config : dict
Configuration `dict` with settings tree
globals_dict : dict, optional
Supply `globals()` dict, to restore custom classes. Defaults to {}.
Returns
-------
any
`onetick.ml.Experiment` class instance builded from config `dict`
"""
return build_experiment_class(config, globals_dict)()
[docs]def restore_experiment_from_mlflow(run_id: str, mlflow_url: Optional[str] = None):
"""Builds Experiment class from YAML config loaded from MLFlow.
Parameters
----------
run_id : str
MLFlow Run ID to restore from
mlflow_url : Optional[str], optional
MLFlow Tracking URI. Defaults to None.
Returns
-------
onetick.ml.Experiment: instance inherited from `onetick.ml.Experiment` class restored from MLFlow run
"""
if mlflow_url:
mlflow.set_tracking_uri(mlflow_url)
client = MlflowClient()
with tempfile.TemporaryDirectory(suffix=run_id) as tmpdirname:
config_path = client.download_artifacts(run_id, "config.yaml", tmpdirname)
builder = ExperimentBuilder(config_path)
RestoredExperiment = builder.build_experiment_class()
hashes_file = client.download_artifacts(run_id, "datahashes.yaml", tmpdirname)
with open(hashes_file) as fp:
restored_datafeed_hashes = yaml.load(fp, Loader=yaml.Loader)
experiment_restored = RestoredExperiment()
experiment_restored.load_mlflow_model(run_id)
experiment_restored.restored_datafeed_hashes = restored_datafeed_hashes
return experiment_restored
[docs]def create_folds(x_train=None,
x_val=None, y_val=None,
# val_size=0.0, test_size=0.1,
val_type: Literal['None', 'Simple', 'Cross', 'WalkForward'] = 'None',
folds_num=5):
"""Create folds for cross-validation
Parameters
----------
x_train : pandas.DataFrame, optional
Features for training, by default None
x_val : pandas.DataFrame, optional
Features for validation, by default None
y_val : pandas.DataFrame, optional
Targets for validation, by default None
val_type : Literal["None", "Simple", "Cross", "WalkForward"], optional
Cross-validation type, by default "None"
folds_num : int, optional
Number of folds, by default 5
Returns
-------
any
Folds indices generator used for cross-validation
(`GridSearchCV` or `RandomizedSearchCV`) in `onetick.ml.Experiment`
"""
if x_val is None or y_val is None or val_type == 'None':
return [(slice(None), slice(None))]
elif val_type == 'Simple':
# todo it needs to change simple validation
# train_indices, val_indices = train_test_split(list(range(len(x_train))),
# test_size=0.15,
# shuffle=False)
train_indices = list(range(len(x_train.index)))
val_indices = list(range(len(x_train.index), len(x_train.index) + len(x_val.index)))
return [(train_indices, val_indices)]
elif val_type == 'Cross':
return KFold(n_splits=folds_num, shuffle=False)
elif val_type == 'WalkForward':
return TimeSeriesSplit(n_splits=folds_num)
def walk_forward_split_train_test(df, n_splits=50, test_size=50): # pragma: no cover
max_train_size = df.shape[0] // n_splits + 1
tscv = TimeSeriesSplit(max_train_size=max_train_size, n_splits=n_splits, test_size=test_size)
train_indexes, test_indexes = [], []
for train, test in tscv.split(df):
train_indexes.append(train)
test_indexes.append(test)
return train_indexes, test_indexes