Source code for dsframework.impl.data_pipelines.preprocessors

import copy
import math
import numpy as np
import pandas as pd
from typing import Optional, Union
import onetick.py as otp

from sklearn import preprocessing
from dsframework.interfaces import BasePreprocess


[docs]class SKLearnPreprocessor(BasePreprocess): """Data preprocessing class that uses sklearn preprocessor. Could use only preprocessors that do not change the number of columns. Parameters ---------- preprocessor_class : sklearn.preprocessing class Class of sklearn preprocessor to be used. columns : list, str List of columns to which preprocessing will be applied. String "__all__" means that all columns will be used. Default is "__all__". fit_on_train : bool If True, then preprocessor will be fitted only on train data. Default is False. params : dict Keyword arguments to be passed to sklearn preprocessor constructor. suffix : str Suffix to be added to column name after preprocessing. Default is None, which means that suffix will be equal to preprocessor class name in upper case. """ def __init__(self, preprocessor_class, columns: Union[list, str] = "__all__", params: Optional[dict] = None, fit_on_train: bool = False, group_by_column: bool = False, suffix: str = ""): super().__init__(preprocessor_class=preprocessor_class, columns=columns, params=params, fit_on_train=fit_on_train, group_by_column=group_by_column, suffix=suffix) self.preprocessor_class = preprocessor_class self.fit_on_train = fit_on_train self.params = params or {} self.preprocessors = {}
[docs] def fit_pandas(self, df: pd.DataFrame): """Fits selected preprocessor on data without transforming it. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. columns : list, optional List of columns to which preprocessing will be applied. If set to None, then all columns will be used. By default None. """ for column in self.column_names(df): preprocessor = self.preprocessor_class(**self.params) if self.fit_on_train and "__SPLIT" in df.columns: data = df[df["__SPLIT"] == "TRAIN"][column] else: data = df[column] preprocessor.fit(data.values.reshape(-1, 1)) self.preprocessors[column] = preprocessor return df
[docs] def transform_pandas(self, df: pd.DataFrame): """Transform data using selected sklearn preprocessor. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. columns : list, optional List of columns to which preprocessing will be applied. If set to None, then all columns will be used. By default None. """ for new_column, column in self.iter_column_pairs(df): preprocessor = self.preprocessors[column] column_data = df[column].values.reshape(-1, 1) df[new_column] = preprocessor.transform(column_data) return df
[docs] def inverse_transform(self, prediction_df: pd.DataFrame = None) -> pd.DataFrame: """Deprocess data using sklearn preprocessor. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. Returns ------- pd.DataFrame Deprocessed dataframe. """ if prediction_df is None: return None _df = prediction_df.copy(deep=True) for column, preprocessor in self.preprocessors.items(): new_column = self.get_new_column_name(column) if new_column not in _df.columns: continue column_data = _df[new_column].values.reshape(-1, 1) _df[column] = preprocessor.inverse_transform(column_data) return _df
[docs]class ApplyLog(BasePreprocess): """ Data preprocessing class that logarithms data. Parameters ---------- columns : list List of columns to which preprocessing will be applied base : float Base of the logarithm. Default: math.e suffix : str Suffix to be added to column name after preprocessing. Default is "", which means that preprocessing will not create new column and apply logarithm to the original column. """ refit_on_predict = True deprocessable = True def __init__(self, base: Optional[float] = None, columns: Union[list, str] = "__all__", suffix: str = "", ): super().__init__(columns=columns, base=base, suffix=suffix) self.base = math.e if base is None else base self.log_base = np.log(self.base) def transform_ot(self, src: otp.Source): self._columns = {} for new_column, column in self.iter_column_pairs(src): src[new_column] = otp.math.ln(src[column]) / otp.math.ln(self.base) self._columns[column] = self.base return src
[docs] def transform_pandas(self, df: pd.DataFrame): """Preprocess data by applying logarithm. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """ self._columns = {} new_columns, columns = zip(*self.iter_column_pairs(df)) df[list(new_columns)] = np.log(df[list(columns)]) / self.log_base self._columns.update(zip(columns, [self.base] * len(columns))) return df
[docs] def inverse_transform(self, prediction_df: Optional[pd.DataFrame] = None) -> pd.DataFrame: """Reverse process data by applying the exponential function. Parameters ---------- prediction_df : pd.DataFrame, optional Dataframe with predictions, by default None Returns ------- pd.DataFrame or None Reverse processed dataframe with predictions or None if `prediction_df` is None. """ if prediction_df is None: return None _df = prediction_df.copy(deep=True) # todo: which is less: _df.columns or self._columns_map? columns = [column for column in _df.columns if column in self._columns_map] original_columns = [self._columns_map[column] for column in columns] base_series = pd.Series(self._columns) _df[original_columns] = base_series[original_columns].values ** _df[columns].values return _df
[docs]class LimitOutliers(BasePreprocess): """ Data preprocessing class that limits outliers by using standard deviations. The maximum and minimum allowable values are calculated using the formula: `mean ± std_num * std`, where `mean` and `std` are the mean value and standard deviation calculated on the training set. Parameters ---------- columns : list List of columns to which preprocessing will be applied std_num : float The number of standard deviations used to limit outliers. Default: 3 """ refit_on_predict = False deprocessable = False def __init__(self, std_num: float = 3, columns: Union[list, str] = "__all__", suffix: str = "",): super().__init__(std_num=std_num, columns=columns, suffix=suffix) self.std_num = std_num self.fitted = False self._columns = {}
[docs] def fit_pandas(self, df: pd.DataFrame): """Fits selected preprocessor on data without transforming it. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """ self._columns = {} if "__SPLIT" in df.columns: _df = df[df["__SPLIT"] == "TRAIN"] else: _df = df columns = self.column_names(_df) means = _df[columns].mean(numeric_only=True) stds = _df[columns].std(numeric_only=True) for column in columns: self._columns[column] = { 'up_border': means[column] + self.std_num * stds[column], 'down_border': means[column] - self.std_num * stds[column] } self.fitted = True return df
[docs] def transform_pandas(self, df: pd.DataFrame): """Process data by limiting (capping) outliers. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. columns : list, optional List of columns to which preprocessing will be applied. If set to None, then all columns will be used. By default None. """ for new_column, column in self.iter_column_pairs(df): if column not in self._columns: raise ValueError(f'Column {column} is not fitted by LimitOutliers. Please fit data before transform.') up_border = self._columns[column]['up_border'] down_border = self._columns[column]['down_border'] df[new_column] = df[column].clip(lower=down_border, upper=up_border) return df
def transform_ot(self, src: otp.Source): columns = self.column_names(src) agg_dict = {} for column in columns: agg_dict[f"__{column}_MEAN"] = otp.agg.mean(column) agg_dict[f"__{column}_STD"] = otp.agg.stddev(column, biased=False) if self.schema.set_name in src.schema: # fit only on train set, if splitting is used data_agg, _ = src[src[self.schema.set_name] == "TRAIN"] else: data_agg = src data_agg = data_agg.agg(agg_dict, bucket_time='start') for column in columns: data_agg[f"__{column}_LOWER"] = data_agg[f"__{column}_MEAN"] - \ self.std_num * data_agg[f"__{column}_STD"] data_agg[f"__{column}_HIGHER"] = data_agg[f"__{column}_MEAN"] + \ self.std_num * data_agg[f"__{column}_STD"] # data_agg.drop(columns=[f"__{column}_MEAN", f"__{column}_STD"], inplace=True) src = otp.join_by_time([src, data_agg], policy="LATEST_TICKS") for column in columns: src[f"{column}{self.suffix}"] = src[column].apply( lambda x: data_agg[f"__{column}_LOWER"] if x < data_agg[f"__{column}_LOWER"] else data_agg[f"__{column}_HIGHER"] if x > data_agg[f"__{column}_HIGHER"] else x ) return src
[docs]class MinMaxScaler(SKLearnPreprocessor): """ Data preprocessing class that scales data to a given range. Parameters ---------- columns : list, optional List of columns to which preprocessing will be applied. By default None. transformed_range: tuple Desired range of transformed data. Default: (0, 1) """ refit_on_predict = False deprocessable = True def __init__(self, columns: Optional[list] = None, transformed_range: tuple = (0, 1), suffix: str = "", group_by_column: bool = False): self.transformed_range = (0, 1) if transformed_range is None else transformed_range super().__init__( preprocessor_class=preprocessing.MinMaxScaler, fit_on_train=True, columns=columns, suffix=suffix, group_by_column=group_by_column, params=dict(feature_range=self.transformed_range) ) # hack to save init params specifically for current class, not for the parent class self.save_init_params(dict( transformed_range=self.transformed_range, columns=columns )) def fit_ot(self, src: otp.Source): columns = self.column_names(src) agg_dict = {} for column in columns: agg_dict[f"__{column}_MIN"] = otp.agg.min(column) agg_dict[f"__{column}_MAX"] = otp.agg.max(column) # src[new_column] = src[column].apply(lambda x: ) if self.schema.set_name in src.schema: # fit only on train set, if splitting is used data_agg, _ = src[src[self.schema.set_name] == "TRAIN"] else: data_agg = src data_agg = data_agg.agg(agg_dict, bucket_time='start') src = otp.join_by_time([src, data_agg], policy="LATEST_TICKS") # save min and max values in a separate column for inverse transform self.schema.utility_columns += [f"__{column}_MIN" for column in columns] self.schema.utility_columns += [f"__{column}_MAX" for column in columns] return src def transform_ot(self, src: otp.Source): a = self.transformed_range[0] b = self.transformed_range[1] for new_column, column in self.iter_column_pairs(src): src[new_column] = src[column] src[new_column] = src[column].apply( lambda x: a + (x - src[f"__{column}_MIN"]) * (b - a) / (src[f"__{column}_MAX"] - src[f"__{column}_MIN"])) return src
[docs] def inverse_transform(self, prediction_df: pd.DataFrame = None) -> pd.DataFrame: a = self.transformed_range[0] b = self.transformed_range[1] _df = prediction_df.copy() for new_column, column in self._columns_map.items(): if new_column not in _df.columns: continue cmin = f"__{column}_MIN" cmax = f"__{column}_MAX" if cmin in _df.columns and cmax in _df.columns: _df[column] = (_df[cmax] - _df[cmin]) / (b - a) * (_df[new_column] - a) + _df[cmin] else: _df[column] = self.preprocessors[column].inverse_transform(_df[new_column].values.reshape(-1, 1)) return _df
[docs]class LaggedDifferences(BasePreprocess): """ Data preprocessing class that calculates the difference between the current and lag values (of time series). Note: this preprocessing will make first `lag` rows contain NaN values. Filter them out before training. Parameters ---------- columns : list List of columns to which preprocessing will be applied. lag: int Value of the lag to be used. This value is equals, how many rows will be removed from the beggining of the dataframe. Default: 39 """ refit_on_predict = True deprocessable = True def __init__(self, lag: int = 39, columns: Union[list, str] = "__all__", suffix: str = "",): super().__init__(columns=columns, lag=lag, suffix=suffix) self.lag = lag self.suffix = suffix self._shift_df = None
[docs] def transform_ot(self, src: otp.Source): """ Calculates lagged differences for the given columns. Parameters ---------- src: otp.Source Source to calulate lagged differences for. columns : list, optional List of columns to calculate lagged differences for. Returns ------- pd.DataFrame Source with calculated lagged differences in a new columns. """ for new_col, col in self.iter_column_pairs(src): # calculate lagged values in a separate columns # to use it in inverse_transform() method utility_column = f'__LAGGED_{self.lag}_{col}' src[utility_column] = src[col][-self.lag] src[new_col] = src[col] - src[utility_column] self.schema.utility_columns += [utility_column] return src
[docs] def fit_pandas(self, df: pd.DataFrame): """Preprocess data by calculating the difference between the current and lag values (of time series). Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """ self._shift_df = df.loc[:, self.column_names(df)].shift(self.lag).copy(deep=True) return df
def transform_pandas(self, df: pd.DataFrame): if self._shift_df is None: raise RuntimeError("LaggedDifferences cannot be applied without being fitted on data first") columns = self.column_names(df) suffix_columns = [f"{column}{self.suffix}" for column in columns] df[suffix_columns] = df[columns].subtract(self._shift_df[columns]) return df
[docs] def inverse_transform(self, prediction_df: pd.DataFrame = None) -> pd.DataFrame: """Reverse process data by adding the lagged values to the corresponding prediction values. Parameters ---------- prediction_df : pd.DataFrame Dataframe to be deprocessed. Returns ------- pd.DataFrame Deprocessed dataframe with the same shape as the input dataframe or None if prediction_df is None. """ if prediction_df is None: return None if self._shift_df is None: # if operator transformed data in onetick context # then we should extract _shift_df from utility_columns self._shift_df = pd.DataFrame() for column, original_column in self._columns_map.items(): utility_column = f'__LAGGED_{self.lag}_{original_column}' if utility_column in prediction_df.columns: self._shift_df[original_column] = prediction_df[utility_column] else: raise RuntimeError("LaggedDifferences cannot deprocess without being fitted on all columns first") _df = prediction_df.copy(deep=True) # todo: which is less: _df.columns or self._columns_map? columns = [column for column in _df.columns if column in self._columns_map] original_columns = [self._columns_map[column] for column in columns] _df[original_columns] = _df[columns].values + self._shift_df.loc[_df.index, original_columns].values return _df
[docs]class IntradayAveraging(BasePreprocess): """ Data preprocessing class that calculates the difference between the current value and the average value of the same intraday interval over the past N days. Note: this preprocessing will remove the first `bins`*`window_days` rows from the dataframe. Parameters ---------- columns : list List of columns to which preprocessing will be applied. window_days: int Number of days for averaging. Default: 5 bins: int Number of intraday intervals. If None is specified, then it is determined automatically by the number of unique hh:mm buckets. Default: None datetime_column: str Name of the column that stores the datetime values. Default: 'Time' suffix: str Suffix to be added to the column names. Default: '' """ refit_on_predict = True deprocessable = True def __init__(self, columns: Union[list, str] = "__all__", window_days=5, bins=None, datetime_column='Time', suffix: str = ""): super().__init__(columns=columns, window_days=window_days, bins=bins, datetime_column=datetime_column, suffix=suffix) self.window_days = window_days self.bins = bins self.datetime_column = datetime_column self._agg_df = None self._columns = None
[docs] def fit_pandas(self, df: pd.DataFrame): """Preprocess (timeseries) data by calculating the difference between the current value and the average value of the same intraday interval in the past period of time. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """ columns = self.column_names(df) df.loc[:, self.datetime_column] = pd.to_datetime(df[self.datetime_column]) df.loc[:, '__hhmm'] = df[self.datetime_column].dt.time agg_df = df[columns + ['__hhmm']].copy(deep=True) agg_df = agg_df.groupby(by='__hhmm').rolling(self.window_days).mean() agg_df = agg_df.shift(1).reset_index(level=0).sort_index() # TODO Now solution below adjust for num-type index, it needs to find best option to cut incorrect shift vals if self.bins is None: self.bins = df['__hhmm'].nunique() min_agg_index = self.bins * self.window_days + min(df.index) agg_df.loc[agg_df.index < min_agg_index, :] = np.nan self._agg_df = agg_df self._columns = columns return df
def transform_pandas(self, df: pd.DataFrame): assert self._agg_df is not None, "IntradayAveraging cannot be applied without being fitted on data first" new_columns, columns = zip(*self.iter_column_pairs(df)) df[list(new_columns)] = df[list(columns)] - self._agg_df[list(columns)] return df
[docs] def inverse_transform(self, prediction_df: pd.DataFrame = None) -> pd.DataFrame: """Reverse process data by adding the average value of the intraday intervals to the corresponding prediction values. Parameters ---------- prediction_df : pd.DataFrame Dataframe to be deprocessed. Returns ------- pd.DataFrame Reverse processed dataframe with the same shape as the input dataframe or None if prediction_df is None. """ assert self._agg_df is not None, "IntradayAveraging cannot deprocess without being fitted on data first" if prediction_df is None: return None _df = prediction_df.copy(deep=True) # todo: which is less: _df.columns or self._columns_map? columns = [column for column in _df.columns if column in self._columns_map] original_columns = [self._columns_map[column] for column in columns] _df[original_columns] = _df[columns].values + self._agg_df[original_columns].values return _df
[docs]class GroupByColumn(BasePreprocess): def __init__(self, preprocessor, columns: Union[list, str] = "__all__", ): super().__init__( preprocessor=preprocessor, columns=columns, suffix="") self.preprocessor = preprocessor self._groups = None self._processors = {}
[docs] def get_init_params(self): """Override get_init_params in order to replace GroupByColumn with nested preprocessor and add group_by_column to its dict. """ _params = copy.deepcopy(self.preprocessor.get_init_params()) _params['group_by_column'] = True return _params
def fit_pandas(self, df: pd.DataFrame): self._groups = df.groupby(self.schema.group_column_name) for group, group_df in self._groups: # clone preprocessor and save it for transform self._processors[group] = copy.deepcopy(self.preprocessor) self._processors[group].schema = self.schema # fit each preprocessor on a particular ticker # todo: do we need copy below? self._processors[group].fit(group_df.copy()) return df def transform_pandas(self, df: pd.DataFrame): assert self._groups is not None, "GroupByColumn cannot be applied without being fitted on data first" # apply transform to each group for group, group_df in self._groups: transformed_group = self._processors[group].transform(group_df) # initialize new columns with NaNs new_columns = set(transformed_group.columns) - set(df.columns) for new_column in new_columns: if new_column not in df.columns: df[new_column] = np.nan df.loc[group_df.index] = transformed_group return df def fit_ot(self, src: otp.Source): return self.preprocessor.fit_ot(src) def transform_ot(self, src: otp.Source): return self.preprocessor.transform_ot(src)
[docs] def inverse_transform(self, prediction_df: pd.DataFrame = None) -> pd.DataFrame: if self.preprocessor.deprocessable is False: return prediction_df assert self._groups is not None, "GroupByColumn cannot be applied without being fitted on data first" if prediction_df is None: return None _df = prediction_df.copy(deep=True) _df = _df.groupby(self.schema.group_column_name, group_keys=False).apply( lambda x: self._processors[x.name].inverse_transform(x)) # drop group column index # todo: check if it is correct??? # _df.index = _df.index.droplevel(0) _df = _df.sort_index() return _df
class RefittableLabelEncoder(preprocessing.LabelEncoder): """ Same as LabelEncoder, but be fitted on new data without losing the mapping of the old data. SKLearn LabelEncoder is not refittable because it throws an error if new classes are encountered during transform. """ def fit(self, y, **kwargs): if hasattr(self, 'classes_'): self.classes_ = np.unique(np.concatenate((self.classes_, np.unique(y)))) super().fit(y) return self def fit_transform(self, y, **kwargs): self.fit(y) return self.transform(y)