Source code for onetick.ml.interfaces.data_pipelines
importcopyimportdatetimefromabcimportabstractmethod,ABCMetaimportrefromtypingimportList,Optional,Unionimportonetick.pyasotpimportpandasaspdfromonetick.ml.utils.paramsaverimportBaseParameterSaverfromonetick.ml.utils.schemaimportDataSchemaclassBasePipelineOperator(BaseParameterSaver,metaclass=ABCMeta):refit_on_predict=Truedeprocessable=Falsedef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self.schema=DataSchema()self.columns=kwargs.get("columns","__all__")self.suffix=kwargs.get('suffix',"_"+self.__class__.__name__.capitalize())self._columns_map={}def__str__(self):returnself.__class__.__name__+f'({self.columns})'def__repr__(self):returnself.__str__()def_set_column_map(self,src):self._columns_map={new_column:columnfornew_column,columninself.iter_column_pairs(src)}defget_new_column_name(self,column):returnf"{column}{self.suffix}"defiter_column_pairs(self,src):forcolumninself.column_names(src):yieldself.get_new_column_name(column),columndef_remove_utility_columns(self,columns):return[columnforcolumnincolumnsifnotcolumn.startswith("__")andcolumnnotinself.schema.utility_columnsandcolumn!="Time"]def_get_columns_list(self,src)->List[str]:ifisinstance(src,otp.Source):returnself._remove_utility_columns(list(src.schema))ifisinstance(src,pd.DataFrame):returnself._remove_utility_columns(list(src.columns))raiseTypeError(f"Unsupported type {type(src)} passed to _get_columns_list() method of {self}!")defcolumn_names(self,src)->List[str]:data_columns=self._get_columns_list(src)ifself.columns=='__all__':returndata_columnselifself.columns=="__features__":returnself.schema.features_columnselifself.columns=="__targets__":returnself.schema.target_columnselifisinstance(self.columns,str):self.columns=[self.columns]# match columns with regex/stringall_matched_columns=[]forcolumninself.columns:matched_columns=[]fordata_columnindata_columns:ifre.fullmatch(column,data_column):matched_columns.append(data_column)ifnotmatched_columns:raiseValueError(f"No column found for '{column}' in {data_columns}! Requested by {self}.")all_matched_columns+=matched_columnsreturnall_matched_columnsdeftransform(self,src:Union[otp.Source,pd.DataFrame])->Union[otp.Source,pd.DataFrame]:"""Transforms DataFrame or otp.Source. Parameters ---------- src : DataFrame or otp.Source. Data feed to be processed. Returns ------- DataFrame or otp.Source Transformed data feed. """ifisinstance(src,otp.Source):returnself.transform_ot(src=src)elifisinstance(src,pd.DataFrame):returnself.transform_pandas(df=src)raiseTypeError(f"Unsupported type {type(src)} passed to transform() method of {self}!")deftransform_ot(self,src:otp.Source):raiseNotImplementedError(f"transform_ot() is not implemented for {self.__class__.__name__}!")deftransform_pandas(self,df:pd.DataFrame):raiseNotImplementedError(f"transform_pandas() is not implemented for {self.__class__.__name__}!")deffit(self,src:Union[pd.DataFrame,otp.Source]):"""Fit on a given DataFrame or otp.Source. Parameters ---------- src : DataFrame or otp.Source. Data feed to be fitted on. Returns ------- DataFrame or otp.Source Changed data feed (added columns, etc.) """ifisinstance(src,otp.Source):returnself.fit_ot(src)elifisinstance(src,pd.DataFrame):returnself.fit_pandas(src)raiseTypeError(f"Unsupported type {type(src)}")deffit_ot(self,src:otp.Source):returnsrcdeffit_pandas(self,df:pd.DataFrame):returndfdeffit_transform(self,src:Union[pd.DataFrame,otp.Source]):src=self.fit(src)src=self.transform(src)returnsrcdefinverse_transform(self,prediction_df:pd.DataFrame=None)->pd.DataFrame:"""Reverse process prediction dataframe. Parameters ---------- prediction_df : pd.DataFrame, optional Prediction dataframe to be deprocessed, by default None Returns ------- pd.DataFrame Reverse processed prediction dataframe (if deprocessable) """forcolumninprediction_df.columns:ifcolumninself._columns_map:original_column=self._columns_map[column]prediction_df[original_column]=prediction_df[column]returnprediction_dfdefupdate_target_columns(self):fornew_column,columninself._columns_map.items():ifcolumninself.schema.target_columns:# TODO preserve orderself.schema.target_columns.remove(column)self.schema.target_columns.append(new_column)
[docs]classBaseDatafeed(BasePipelineOperator,metaclass=ABCMeta):"""Base class for all datafeeds. """@abstractmethoddefload(self,*args,**kwargs):pass@classmethoddefrestore_instance(cls,params):if'start'inparams:params['start']=otp.dt(*params['start'])if'end'inparams:params['end']=otp.dt(*params['end'])returncls(**params)defsave_init_params(self,params,no_class=False):params=copy.deepcopy(params)forargin['start','end']:ifarginparamsandisinstance(params[arg],otp.types.datetime):a=params[arg]res=[a.year,a.month,a.day,a.hour,a.minute,a.second,a.microsecond,a.nanosecond]whileres[-1]==0:res.pop(-1)params[arg]=resreturnsuper().save_init_params(params,no_class=no_class)defmerge_symbols(self,src):returnsrcdefrun(self,src):returnsrc
[docs]classBasePreprocess(BasePipelineOperator,metaclass=ABCMeta):"""Base class for all preprocessors. """def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self.group_by_column=kwargs.pop('group_by_column',None)self._proc_df=Nonedef__new__(cls,*args,**kwargs):# We override this method in order to automatically create# `GroupByColumn` classes instead when `group_by_column` is set.# Based on https://github.com/encode/django-rest-framework/blob/master/rest_framework/serializers.py#L120fromonetick.ml.implimportGroupByColumngroup_by_column=kwargs.pop('group_by_column',False)ifgroup_by_columnandcls!=GroupByColumn:returnGroupByColumn(preprocessor=cls(*args,**kwargs),columns=kwargs.get('columns','__all__'))instance=super().__new__(cls)instance._args=argsinstance._kwargs=kwargsreturninstance# Allow type checkers to make serializers generic.def__class_getitem__(cls,*args,**kwargs):returncls
[docs]deftransform(self,src:Union[pd.DataFrame,otp.Source])->Union[pd.DataFrame,otp.Source]:"""Transforms DataFrame or otp.Source. Parameters ---------- src : DataFrame or otp.Source. Data feed to be processed. Returns ------- DataFrame or otp.Source Transformed data feed. """returnsuper().transform(src)
[docs]deffit(self,src:Union[pd.DataFrame,otp.Source]):"""Fit on a given DataFrame or otp.Source. Parameters ---------- src : DataFrame or otp.Source. Data feed to be fitted on. Returns ------- DataFrame or otp.Source Changed data feed (added columns, etc.) """self._set_column_map(src)returnsuper().fit(src)
[docs]classBaseFeatures(BasePipelineOperator,metaclass=ABCMeta):"""Base class for all feature extractors. """def__init__(self,**kwargs):self.added_features=[]self.schema=Nonesuper().__init__(**kwargs)
[docs]classBaseSplitting(BasePipelineOperator,metaclass=ABCMeta):"""Base class for all splitting methods. """
classBaseFilter(BasePipelineOperator,metaclass=ABCMeta):"""Base class for filters. """