[docs]classSKLearnPreprocessor(BasePreprocess):"""Data preprocessing class that uses sklearn preprocessor. Could use only preprocessors that do not change the number of columns. Parameters ---------- preprocessor_class : sklearn.preprocessing class Class of sklearn preprocessor to be used. columns : list, str List of columns to which preprocessing will be applied. String "__all__" means that all columns will be used. Default is "__all__". fit_on_train : bool If True, then preprocessor will be fitted only on train data. Default is False. params : dict Keyword arguments to be passed to sklearn preprocessor constructor. suffix : str Suffix to be added to column name after preprocessing. Default is None, which means that suffix will be equal to preprocessor class name in upper case. """def__init__(self,preprocessor_class,columns:Union[list,str]="__all__",params:Optional[dict]=None,fit_on_train:bool=False,group_by_column:bool=False,suffix:str=""):super().__init__(preprocessor_class=preprocessor_class,columns=columns,params=params,fit_on_train=fit_on_train,group_by_column=group_by_column,suffix=suffix)self.preprocessor_class=preprocessor_classself.fit_on_train=fit_on_trainself.params=paramsor{}self.preprocessors={}
[docs]deffit_pandas(self,df:pd.DataFrame):"""Fits selected preprocessor on data without transforming it. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. columns : list, optional List of columns to which preprocessing will be applied. If set to None, then all columns will be used. By default None. """forcolumninself.column_names(df):preprocessor=self.preprocessor_class(**self.params)ifself.fit_on_trainand"__SPLIT"indf.columns:data=df[df["__SPLIT"]=="TRAIN"][column]else:data=df[column]preprocessor.fit(data.values.reshape(-1,1))self.preprocessors[column]=preprocessorreturndf
[docs]deftransform_pandas(self,df:pd.DataFrame):"""Transform data using selected sklearn preprocessor. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. columns : list, optional List of columns to which preprocessing will be applied. If set to None, then all columns will be used. By default None. """fornew_column,columninself.iter_column_pairs(df):preprocessor=self.preprocessors[column]column_data=df[column].values.reshape(-1,1)df[new_column]=preprocessor.transform(column_data)returndf
[docs]definverse_transform(self,prediction_df:pd.DataFrame=None)->pd.DataFrame:"""Deprocess data using sklearn preprocessor. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. Returns ------- pd.DataFrame Deprocessed dataframe. """ifprediction_dfisNone:returnNone_df=prediction_df.copy(deep=True)forcolumn,preprocessorinself.preprocessors.items():new_column=self.get_new_column_name(column)ifnew_columnnotin_df.columns:continuecolumn_data=_df[new_column].values.reshape(-1,1)_df[column]=preprocessor.inverse_transform(column_data)return_df
[docs]classApplyLog(BasePreprocess):""" Data preprocessing class that logarithms data. Parameters ---------- columns : list List of columns to which preprocessing will be applied base : float Base of the logarithm. Default: math.e suffix : str Suffix to be added to column name after preprocessing. Default is "", which means that preprocessing will not create new column and apply logarithm to the original column. """refit_on_predict=Truedeprocessable=Truedef__init__(self,base:Optional[float]=None,columns:Union[list,str]="__all__",suffix:str="",):super().__init__(columns=columns,base=base,suffix=suffix)self.base=math.eifbaseisNoneelsebaseself.log_base=np.log(self.base)deftransform_ot(self,src:otp.Source):self._columns={}fornew_column,columninself.iter_column_pairs(src):src[new_column]=otp.math.ln(src[column])/otp.math.ln(self.base)self._columns[column]=self.basereturnsrc
[docs]deftransform_pandas(self,df:pd.DataFrame):"""Preprocess data by applying logarithm. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """self._columns={}new_columns,columns=zip(*self.iter_column_pairs(df))df[list(new_columns)]=np.log(df[list(columns)])/self.log_baseself._columns.update(zip(columns,[self.base]*len(columns)))returndf
[docs]definverse_transform(self,prediction_df:Optional[pd.DataFrame]=None)->pd.DataFrame:"""Reverse process data by applying the exponential function. Parameters ---------- prediction_df : pd.DataFrame, optional Dataframe with predictions, by default None Returns ------- pd.DataFrame or None Reverse processed dataframe with predictions or None if `prediction_df` is None. """ifprediction_dfisNone:returnNone_df=prediction_df.copy(deep=True)# todo: which is less: _df.columns or self._columns_map?columns=[columnforcolumnin_df.columnsifcolumninself._columns_map]original_columns=[self._columns_map[column]forcolumnincolumns]base_series=pd.Series(self._columns)_df[original_columns]=base_series[original_columns].values**_df[columns].valuesreturn_df
[docs]classLimitOutliers(BasePreprocess):""" Data preprocessing class that limits outliers by using standard deviations. The maximum and minimum allowable values are calculated using the formula: `mean ± std_num * std`, where `mean` and `std` are the mean value and standard deviation calculated on the training set. Parameters ---------- columns : list List of columns to which preprocessing will be applied std_num : float The number of standard deviations used to limit outliers. Default: 3 """refit_on_predict=Falsedeprocessable=Falsedef__init__(self,std_num:float=3,columns:Union[list,str]="__all__",suffix:str="",):super().__init__(std_num=std_num,columns=columns,suffix=suffix)self.std_num=std_numself.fitted=Falseself._columns={}
[docs]deffit_pandas(self,df:pd.DataFrame):"""Fits selected preprocessor on data without transforming it. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """self._columns={}if"__SPLIT"indf.columns:_df=df[df["__SPLIT"]=="TRAIN"]else:_df=dfcolumns=self.column_names(_df)means=_df[columns].mean(numeric_only=True)stds=_df[columns].std(numeric_only=True)forcolumnincolumns:self._columns[column]={'up_border':means[column]+self.std_num*stds[column],'down_border':means[column]-self.std_num*stds[column]}self.fitted=Truereturndf
[docs]deftransform_pandas(self,df:pd.DataFrame):"""Process data by limiting (capping) outliers. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. columns : list, optional List of columns to which preprocessing will be applied. If set to None, then all columns will be used. By default None. """fornew_column,columninself.iter_column_pairs(df):ifcolumnnotinself._columns:raiseValueError(f'Column {column} is not fitted by LimitOutliers. Please fit data before transform.')up_border=self._columns[column]['up_border']down_border=self._columns[column]['down_border']df[new_column]=df[column].clip(lower=down_border,upper=up_border)returndf
deftransform_ot(self,src:otp.Source):columns=self.column_names(src)agg_dict={}forcolumnincolumns:agg_dict[f"__{column}_MEAN"]=otp.agg.mean(column)agg_dict[f"__{column}_STD"]=otp.agg.stddev(column,biased=False)ifself.schema.set_nameinsrc.schema:# fit only on train set, if splitting is useddata_agg,_=src[src[self.schema.set_name]=="TRAIN"]else:data_agg=srcdata_agg=data_agg.agg(agg_dict,bucket_time='start')forcolumnincolumns:data_agg[f"__{column}_LOWER"]=data_agg[f"__{column}_MEAN"]- \
self.std_num*data_agg[f"__{column}_STD"]data_agg[f"__{column}_HIGHER"]=data_agg[f"__{column}_MEAN"]+ \
self.std_num*data_agg[f"__{column}_STD"]# data_agg.drop(columns=[f"__{column}_MEAN", f"__{column}_STD"], inplace=True)src=otp.join_by_time([src,data_agg],policy="LATEST_TICKS")forcolumnincolumns:src[f"{column}{self.suffix}"]=src[column].apply(lambdax:data_agg[f"__{column}_LOWER"]ifx<data_agg[f"__{column}_LOWER"]elsedata_agg[f"__{column}_HIGHER"]ifx>data_agg[f"__{column}_HIGHER"]elsex)returnsrc
[docs]classMinMaxScaler(SKLearnPreprocessor):""" Data preprocessing class that scales data to a given range. Parameters ---------- columns : list, optional List of columns to which preprocessing will be applied. By default None. transformed_range: tuple Desired range of transformed data. Default: (0, 1) """refit_on_predict=Falsedeprocessable=Truedef__init__(self,columns:Optional[list]=None,transformed_range:tuple=(0,1),suffix:str="",group_by_column:bool=False):self.transformed_range=(0,1)iftransformed_rangeisNoneelsetransformed_rangesuper().__init__(preprocessor_class=preprocessing.MinMaxScaler,fit_on_train=True,columns=columns,suffix=suffix,group_by_column=group_by_column,params=dict(feature_range=self.transformed_range))# hack to save init params specifically for current class, not for the parent classself.save_init_params(dict(transformed_range=self.transformed_range,columns=columns))deffit_ot(self,src:otp.Source):columns=self.column_names(src)agg_dict={}forcolumnincolumns:agg_dict[f"__{column}_MIN"]=otp.agg.min(column)agg_dict[f"__{column}_MAX"]=otp.agg.max(column)# src[new_column] = src[column].apply(lambda x: )ifself.schema.set_nameinsrc.schema:# fit only on train set, if splitting is useddata_agg,_=src[src[self.schema.set_name]=="TRAIN"]else:data_agg=srcdata_agg=data_agg.agg(agg_dict,bucket_time='start')src=otp.join_by_time([src,data_agg],policy="LATEST_TICKS")# save min and max values in a separate column for inverse transformself.schema.utility_columns+=[f"__{column}_MIN"forcolumnincolumns]self.schema.utility_columns+=[f"__{column}_MAX"forcolumnincolumns]returnsrcdeftransform_ot(self,src:otp.Source):a=self.transformed_range[0]b=self.transformed_range[1]fornew_column,columninself.iter_column_pairs(src):src[new_column]=src[column]src[new_column]=src[column].apply(lambdax:a+(x-src[f"__{column}_MIN"])*(b-a)/(src[f"__{column}_MAX"]-src[f"__{column}_MIN"]))returnsrc
[docs]classLaggedDifferences(BasePreprocess):""" Data preprocessing class that calculates the difference between the current and lag values (of time series). Note: this preprocessing will make first `lag` rows contain NaN values. Filter them out before training. Parameters ---------- columns : list List of columns to which preprocessing will be applied. lag: int Value of the lag to be used. This value is equals, how many rows will be removed from the beggining of the dataframe. Default: 39 """refit_on_predict=Truedeprocessable=Truedef__init__(self,lag:int=39,columns:Union[list,str]="__all__",suffix:str="",):super().__init__(columns=columns,lag=lag,suffix=suffix)self.lag=lagself.suffix=suffixself._shift_df=None
[docs]deftransform_ot(self,src:otp.Source):""" Calculates lagged differences for the given columns. Parameters ---------- src: otp.Source Source to calulate lagged differences for. columns : list, optional List of columns to calculate lagged differences for. Returns ------- pd.DataFrame Source with calculated lagged differences in a new columns. """fornew_col,colinself.iter_column_pairs(src):# calculate lagged values in a separate columns# to use it in inverse_transform() methodutility_column=f'__LAGGED_{self.lag}_{col}'src[utility_column]=src[col][-self.lag]src[new_col]=src[col]-src[utility_column]self.schema.utility_columns+=[utility_column]returnsrc
[docs]deffit_pandas(self,df:pd.DataFrame):"""Preprocess data by calculating the difference between the current and lag values (of time series). Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """self._shift_df=df.loc[:,self.column_names(df)].shift(self.lag).copy(deep=True)returndf
deftransform_pandas(self,df:pd.DataFrame):ifself._shift_dfisNone:raiseRuntimeError("LaggedDifferences cannot be applied without being fitted on data first")columns=self.column_names(df)suffix_columns=[f"{column}{self.suffix}"forcolumnincolumns]df[suffix_columns]=df[columns].subtract(self._shift_df[columns])returndf
[docs]definverse_transform(self,prediction_df:pd.DataFrame=None)->pd.DataFrame:"""Reverse process data by adding the lagged values to the corresponding prediction values. Parameters ---------- prediction_df : pd.DataFrame Dataframe to be deprocessed. Returns ------- pd.DataFrame Deprocessed dataframe with the same shape as the input dataframe or None if prediction_df is None. """ifprediction_dfisNone:returnNoneifself._shift_dfisNone:# if operator transformed data in onetick context# then we should extract _shift_df from utility_columnsself._shift_df=pd.DataFrame()forcolumn,original_columninself._columns_map.items():utility_column=f'__LAGGED_{self.lag}_{original_column}'ifutility_columninprediction_df.columns:self._shift_df[original_column]=prediction_df[utility_column]else:raiseRuntimeError("LaggedDifferences cannot deprocess without being fitted on all columns first")_df=prediction_df.copy(deep=True)# todo: which is less: _df.columns or self._columns_map?columns=[columnforcolumnin_df.columnsifcolumninself._columns_map]original_columns=[self._columns_map[column]forcolumnincolumns]_df[original_columns]=_df[columns].values+self._shift_df.loc[_df.index,original_columns].valuesreturn_df
[docs]classIntradayAveraging(BasePreprocess):""" Data preprocessing class that calculates the difference between the current value and the average value of the same intraday interval over the past N days. Note: this preprocessing will remove the first `bins`*`window_days` rows from the dataframe. Parameters ---------- columns : list List of columns to which preprocessing will be applied. window_days: int Number of days for averaging. Default: 5 bins: int Number of intraday intervals. If None is specified, then it is determined automatically by the number of unique hh:mm buckets. Default: None datetime_column: str Name of the column that stores the datetime values. Default: 'Time' suffix: str Suffix to be added to the column names. Default: '' """refit_on_predict=Truedeprocessable=Truedef__init__(self,columns:Union[list,str]="__all__",window_days=5,bins=None,datetime_column='Time',suffix:str=""):super().__init__(columns=columns,window_days=window_days,bins=bins,datetime_column=datetime_column,suffix=suffix)self.window_days=window_daysself.bins=binsself.datetime_column=datetime_columnself._agg_df=Noneself._columns=None
[docs]deffit_pandas(self,df:pd.DataFrame):"""Preprocess (timeseries) data by calculating the difference between the current value and the average value of the same intraday interval in the past period of time. Parameters ---------- df : pd.DataFrame Dataframe to be preprocessed. """columns=self.column_names(df)df.loc[:,self.datetime_column]=pd.to_datetime(df[self.datetime_column])df.loc[:,'__hhmm']=df[self.datetime_column].dt.timeagg_df=df[columns+['__hhmm']].copy(deep=True)agg_df=agg_df.groupby(by='__hhmm').rolling(self.window_days).mean()agg_df=agg_df.shift(1).reset_index(level=0).sort_index()# TODO Now solution below adjust for num-type index, it needs to find best option to cut incorrect shift valsifself.binsisNone:self.bins=df['__hhmm'].nunique()min_agg_index=self.bins*self.window_days+min(df.index)agg_df.loc[agg_df.index<min_agg_index,:]=np.nanself._agg_df=agg_dfself._columns=columnsreturndf
deftransform_pandas(self,df:pd.DataFrame):assertself._agg_dfisnotNone,"IntradayAveraging cannot be applied without being fitted on data first"new_columns,columns=zip(*self.iter_column_pairs(df))df[list(new_columns)]=df[list(columns)]-self._agg_df[list(columns)]returndf
[docs]definverse_transform(self,prediction_df:pd.DataFrame=None)->pd.DataFrame:"""Reverse process data by adding the average value of the intraday intervals to the corresponding prediction values. Parameters ---------- prediction_df : pd.DataFrame Dataframe to be deprocessed. Returns ------- pd.DataFrame Reverse processed dataframe with the same shape as the input dataframe or None if prediction_df is None. """assertself._agg_dfisnotNone,"IntradayAveraging cannot deprocess without being fitted on data first"ifprediction_dfisNone:returnNone_df=prediction_df.copy(deep=True)# todo: which is less: _df.columns or self._columns_map?columns=[columnforcolumnin_df.columnsifcolumninself._columns_map]original_columns=[self._columns_map[column]forcolumnincolumns]_df[original_columns]=_df[columns].values+self._agg_df.loc[_df.index][original_columns].valuesreturn_df
[docs]defget_init_params(self):"""Override get_init_params in order to replace GroupByColumn with nested preprocessor and add group_by_column to its dict. """_params=copy.deepcopy(self.preprocessor.get_init_params())_params['group_by_column']=Truereturn_params
deffit_pandas(self,df:pd.DataFrame):self._groups=df.groupby(self.schema.group_column_name)forgroup,group_dfinself._groups:# clone preprocessor and save it for transformself._processors[group]=copy.deepcopy(self.preprocessor)self._processors[group].schema=self.schema# fit each preprocessor on a particular ticker# todo: do we need copy below?self._processors[group].fit(group_df.copy())returndfdeftransform_pandas(self,df:pd.DataFrame):assertself._groupsisnotNone,"GroupByColumn cannot be applied without being fitted on data first"# apply transform to each groupforgroup,group_dfinself._groups:transformed_group=self._processors[group].transform(group_df)# initialize new columns with NaNsnew_columns=set(transformed_group.columns)-set(df.columns)fornew_columninnew_columns:ifnew_columnnotindf.columns:df[new_column]=np.nandf.loc[group_df.index]=transformed_groupreturndfdeffit_ot(self,src:otp.Source):returnself.preprocessor.fit_ot(src)deftransform_ot(self,src:otp.Source):returnself.preprocessor.transform_ot(src)
[docs]definverse_transform(self,prediction_df:pd.DataFrame=None)->pd.DataFrame:ifself.preprocessor.deprocessableisFalse:returnprediction_dfassertself._groupsisnotNone,"GroupByColumn cannot be applied without being fitted on data first"ifprediction_dfisNone:returnNone_df=prediction_df.copy(deep=True)_df=_df.groupby(self.schema.group_column_name,group_keys=False).apply(lambdax:self._processors[x.name].inverse_transform(x))# drop group column index# todo: check if it is correct???# _df.index = _df.index.droplevel(0)_df=_df.sort_index()return_df
classRefittableLabelEncoder(preprocessing.LabelEncoder):""" Same as LabelEncoder, but be fitted on new data without losing the mapping of the old data. SKLearn LabelEncoder is not refittable because it throws an error if new classes are encountered during transform. """deffit(self,y,**kwargs):ifhasattr(self,'classes_'):self.classes_=np.unique(np.concatenate((self.classes_,np.unique(y))))super().fit(y)returnselfdeffit_transform(self,y,**kwargs):self.fit(y)returnself.transform(y)