import copy
import math
import numpy as np
import pandas as pd
from typing import Optional, Union
import onetick.py as otp
from sklearn import preprocessing
from onetick.ml.interfaces import BasePreprocess
[docs]class SKLearnPreprocessor(BasePreprocess):
"""Data preprocessing class that uses sklearn preprocessor.
Could use only preprocessors that do not change the number of columns.
Parameters
----------
preprocessor_class : sklearn.preprocessing class
Class of sklearn preprocessor to be used.
columns : list, str
List of columns to which preprocessing will be applied.
String "__all__" means that all columns will be used.
Default is "__all__".
fit_on_train : bool
If True, then preprocessor will be fitted only on train data.
Default is False.
params : dict
Keyword arguments to be passed to sklearn preprocessor constructor.
suffix : str
Suffix to be added to column name after preprocessing.
Default is None, which means that suffix will be equal to
preprocessor class name in upper case.
"""
def __init__(self,
preprocessor_class,
columns: Union[list, str] = "__all__",
params: Optional[dict] = None,
fit_on_train: bool = False,
group_by_column: bool = False,
suffix: str = ""):
super().__init__(preprocessor_class=preprocessor_class,
columns=columns,
params=params,
fit_on_train=fit_on_train,
group_by_column=group_by_column,
suffix=suffix)
self.preprocessor_class = preprocessor_class
self.fit_on_train = fit_on_train
self.params = params or {}
self.preprocessors = {}
[docs] def fit_pandas(self, df: pd.DataFrame):
"""Fits selected preprocessor on data without transforming it.
Parameters
----------
df : pd.DataFrame
Dataframe to be preprocessed.
columns : list, optional
List of columns to which preprocessing will be applied.
If set to None, then all columns will be used.
By default None.
"""
for column in self.column_names(df):
preprocessor = self.preprocessor_class(**self.params)
if self.fit_on_train and "__SPLIT" in df.columns:
data = df[df["__SPLIT"] == "TRAIN"][column]
else:
data = df[column]
preprocessor.fit(data.values.reshape(-1, 1))
self.preprocessors[column] = preprocessor
return df
[docs]class ApplyLog(BasePreprocess):
"""
Data preprocessing class that logarithms data.
Parameters
----------
columns : list
List of columns to which preprocessing will be applied
base : float
Base of the logarithm.
Default: math.e
suffix : str
Suffix to be added to column name after preprocessing.
Default is "", which means that preprocessing will not create new column
and apply logarithm to the original column.
"""
refit_on_predict = True
deprocessable = True
def __init__(self,
base: Optional[float] = None,
columns: Union[list, str] = "__all__",
suffix: str = "",
):
super().__init__(columns=columns, base=base, suffix=suffix)
self.base = math.e if base is None else base
self.log_base = np.log(self.base)
def transform_ot(self, src: otp.Source):
self._columns = {}
for new_column, column in self.iter_column_pairs(src):
src[new_column] = otp.math.ln(src[column]) / otp.math.ln(self.base)
self._columns[column] = self.base
return src
[docs]class LimitOutliers(BasePreprocess):
"""
Data preprocessing class that limits outliers by using standard deviations.
The maximum and minimum allowable values are calculated using the formula: `mean ± std_num * std`,
where `mean` and `std` are the mean value and standard deviation calculated on the training set.
Parameters
----------
columns : list
List of columns to which preprocessing will be applied
std_num : float
The number of standard deviations used to limit outliers.
Default: 3
"""
refit_on_predict = False
deprocessable = False
def __init__(self,
std_num: float = 3,
columns: Union[list, str] = "__all__",
suffix: str = "",):
super().__init__(std_num=std_num, columns=columns, suffix=suffix)
self.std_num = std_num
self.fitted = False
self._columns = {}
[docs] def fit_pandas(self, df: pd.DataFrame):
"""Fits selected preprocessor on data without transforming it.
Parameters
----------
df : pd.DataFrame
Dataframe to be preprocessed.
"""
self._columns = {}
if "__SPLIT" in df.columns:
_df = df[df["__SPLIT"] == "TRAIN"]
else:
_df = df
columns = self.column_names(_df)
means = _df[columns].mean(numeric_only=True)
stds = _df[columns].std(numeric_only=True)
for column in columns:
self._columns[column] = {
'up_border': means[column] + self.std_num * stds[column],
'down_border': means[column] - self.std_num * stds[column]
}
self.fitted = True
return df
def transform_ot(self, src: otp.Source):
columns = self.column_names(src)
agg_dict = {}
for column in columns:
agg_dict[f"__{column}_MEAN"] = otp.agg.mean(column)
agg_dict[f"__{column}_STD"] = otp.agg.stddev(column, biased=False)
if self.schema.set_name in src.schema: # fit only on train set, if splitting is used
data_agg, _ = src[src[self.schema.set_name] == "TRAIN"]
else:
data_agg = src
data_agg = data_agg.agg(agg_dict,
bucket_time='start')
for column in columns:
data_agg[f"__{column}_LOWER"] = data_agg[f"__{column}_MEAN"] - \
self.std_num * data_agg[f"__{column}_STD"]
data_agg[f"__{column}_HIGHER"] = data_agg[f"__{column}_MEAN"] + \
self.std_num * data_agg[f"__{column}_STD"]
# data_agg.drop(columns=[f"__{column}_MEAN", f"__{column}_STD"], inplace=True)
src = otp.join_by_time([src, data_agg], policy="LATEST_TICKS")
for column in columns:
src[f"{column}{self.suffix}"] = src[column].apply(
lambda x:
data_agg[f"__{column}_LOWER"] if x < data_agg[f"__{column}_LOWER"] else
data_agg[f"__{column}_HIGHER"] if x > data_agg[f"__{column}_HIGHER"] else
x
)
return src
[docs]class MinMaxScaler(SKLearnPreprocessor):
"""
Data preprocessing class that scales data to a given range.
Parameters
----------
columns : list, optional
List of columns to which preprocessing will be applied.
By default None.
transformed_range: tuple
Desired range of transformed data.
Default: (0, 1)
"""
refit_on_predict = False
deprocessable = True
def __init__(self,
columns: Optional[list] = None,
transformed_range: tuple = (0, 1),
suffix: str = "",
group_by_column: bool = False):
self.transformed_range = (0, 1) if transformed_range is None else transformed_range
super().__init__(
preprocessor_class=preprocessing.MinMaxScaler,
fit_on_train=True,
columns=columns,
suffix=suffix,
group_by_column=group_by_column,
params=dict(feature_range=self.transformed_range)
)
# hack to save init params specifically for current class, not for the parent class
self.save_init_params(dict(
transformed_range=self.transformed_range,
columns=columns
))
def fit_ot(self, src: otp.Source):
columns = self.column_names(src)
agg_dict = {}
for column in columns:
agg_dict[f"__{column}_MIN"] = otp.agg.min(column)
agg_dict[f"__{column}_MAX"] = otp.agg.max(column)
# src[new_column] = src[column].apply(lambda x: )
if self.schema.set_name in src.schema: # fit only on train set, if splitting is used
data_agg, _ = src[src[self.schema.set_name] == "TRAIN"]
else:
data_agg = src
data_agg = data_agg.agg(agg_dict, bucket_time='start')
src = otp.join_by_time([src, data_agg], policy="LATEST_TICKS")
# save min and max values in a separate column for inverse transform
self.schema.utility_columns += [f"__{column}_MIN" for column in columns]
self.schema.utility_columns += [f"__{column}_MAX" for column in columns]
return src
def transform_ot(self, src: otp.Source):
a = self.transformed_range[0]
b = self.transformed_range[1]
for new_column, column in self.iter_column_pairs(src):
src[new_column] = src[column]
src[new_column] = src[column].apply(
lambda x:
a + (x - src[f"__{column}_MIN"]) * (b - a) / (src[f"__{column}_MAX"] - src[f"__{column}_MIN"]))
return src
[docs]class LaggedDifferences(BasePreprocess):
"""
Data preprocessing class that calculates the difference between the current and lag values (of time series).
Note: this preprocessing will make first `lag` rows contain NaN values. Filter them out before training.
Parameters
----------
columns : list
List of columns to which preprocessing will be applied.
lag: int
Value of the lag to be used.
This value is equals, how many rows will be removed from the beggining of the dataframe.
Default: 39
"""
refit_on_predict = True
deprocessable = True
def __init__(self,
lag: int = 39,
columns: Union[list, str] = "__all__",
suffix: str = "",):
super().__init__(columns=columns, lag=lag, suffix=suffix)
self.lag = lag
self.suffix = suffix
self._shift_df = None
[docs] def fit_pandas(self, df: pd.DataFrame):
"""Preprocess data by calculating the difference between the current and lag values (of time series).
Parameters
----------
df : pd.DataFrame
Dataframe to be preprocessed.
"""
self._shift_df = df.loc[:, self.column_names(df)].shift(self.lag).copy(deep=True)
return df
def transform_pandas(self, df: pd.DataFrame):
if self._shift_df is None:
raise RuntimeError("LaggedDifferences cannot be applied without being fitted on data first")
columns = self.column_names(df)
suffix_columns = [f"{column}{self.suffix}" for column in columns]
df[suffix_columns] = df[columns].subtract(self._shift_df[columns])
return df
[docs]class IntradayAveraging(BasePreprocess):
"""
Data preprocessing class that calculates the difference between the current value and the average value of the same
intraday interval over the past N days.
Note: this preprocessing will remove the first `bins`*`window_days` rows from the dataframe.
Parameters
----------
columns : list
List of columns to which preprocessing will be applied.
window_days: int
Number of days for averaging.
Default: 5
bins: int
Number of intraday intervals.
If None is specified, then it is determined automatically by the number of unique hh:mm buckets.
Default: None
datetime_column: str
Name of the column that stores the datetime values.
Default: 'Time'
suffix: str
Suffix to be added to the column names.
Default: ''
"""
refit_on_predict = True
deprocessable = True
def __init__(self,
columns: Union[list, str] = "__all__",
window_days=5,
bins=None,
datetime_column='Time',
suffix: str = ""):
super().__init__(columns=columns,
window_days=window_days,
bins=bins,
datetime_column=datetime_column,
suffix=suffix)
self.window_days = window_days
self.bins = bins
self.datetime_column = datetime_column
self._agg_df = None
self._columns = None
[docs] def fit_pandas(self, df: pd.DataFrame):
"""Preprocess (timeseries) data by calculating the difference between the current value
and the average value of the same intraday interval in the past period of time.
Parameters
----------
df : pd.DataFrame
Dataframe to be preprocessed.
"""
columns = self.column_names(df)
df.loc[:, self.datetime_column] = pd.to_datetime(df[self.datetime_column])
df.loc[:, '__hhmm'] = df[self.datetime_column].dt.time
agg_df = df[columns + ['__hhmm']].copy(deep=True)
agg_df = agg_df.groupby(by='__hhmm').rolling(self.window_days).mean()
agg_df = agg_df.shift(1).reset_index(level=0).sort_index()
# TODO Now solution below adjust for num-type index, it needs to find best option to cut incorrect shift vals
if self.bins is None:
self.bins = df['__hhmm'].nunique()
min_agg_index = self.bins * self.window_days + min(df.index)
agg_df.loc[agg_df.index < min_agg_index, :] = np.nan
self._agg_df = agg_df
self._columns = columns
return df
def transform_pandas(self, df: pd.DataFrame):
assert self._agg_df is not None, "IntradayAveraging cannot be applied without being fitted on data first"
new_columns, columns = zip(*self.iter_column_pairs(df))
df[list(new_columns)] = df[list(columns)] - self._agg_df[list(columns)]
return df
[docs]class GroupByColumn(BasePreprocess):
def __init__(self,
preprocessor,
columns: Union[list, str] = "__all__",
):
super().__init__(
preprocessor=preprocessor,
columns=columns,
suffix="")
self.preprocessor = preprocessor
self._groups = None
self._processors = {}
[docs] def get_init_params(self):
"""Override get_init_params in order to replace GroupByColumn with nested preprocessor
and add group_by_column to its dict.
"""
_params = copy.deepcopy(self.preprocessor.get_init_params())
_params['group_by_column'] = True
return _params
def fit_pandas(self, df: pd.DataFrame):
self._groups = df.groupby(self.schema.group_column_name)
for group, group_df in self._groups:
# clone preprocessor and save it for transform
self._processors[group] = copy.deepcopy(self.preprocessor)
self._processors[group].schema = self.schema
# fit each preprocessor on a particular ticker
# todo: do we need copy below?
self._processors[group].fit(group_df.copy())
return df
def transform_pandas(self, df: pd.DataFrame):
assert self._groups is not None, "GroupByColumn cannot be applied without being fitted on data first"
# apply transform to each group
for group, group_df in self._groups:
transformed_group = self._processors[group].transform(group_df)
# initialize new columns with NaNs
new_columns = set(transformed_group.columns) - set(df.columns)
for new_column in new_columns:
if new_column not in df.columns:
df[new_column] = np.nan
df.loc[group_df.index] = transformed_group
return df
def fit_ot(self, src: otp.Source):
return self.preprocessor.fit_ot(src)
def transform_ot(self, src: otp.Source):
return self.preprocessor.transform_ot(src)
class RefittableLabelEncoder(preprocessing.LabelEncoder):
"""
Same as LabelEncoder, but be fitted on new data without losing the mapping of the old data.
SKLearn LabelEncoder is not refittable because it throws an error
if new classes are encountered during transform.
"""
def fit(self, y, **kwargs):
if hasattr(self, 'classes_'):
self.classes_ = np.unique(np.concatenate((self.classes_, np.unique(y))))
super().fit(y)
return self
def fit_transform(self, y, **kwargs):
self.fit(y)
return self.transform(y)