[docs]classBaseOnetickLoader(BaseDatafeed):def__init__(self,**kwargs):# make all kwargs are set as attributes of selfforkey,valueinkwargs.items():ifnothasattr(self,key):setattr(self,key,value)# set defaultsself.timezone=kwargs.get('timezone','EST5EDT')self.symbols=kwargs.get('symbols',['AAPL'])ifisinstance(self.symbols,str):self.symbols=[self.symbols]super().__init__(**kwargs)
[docs]defget_source(self)->otp.DataSource:"""Generate otp.Source for further processing and loading."""raiseNotImplementedError
defrun(self,src):src=self.merge_symbols(src)# if datafeed is not splitted, then we assume that whole data is trainifself.schema.set_namenotinsrc.schema:src[self.schema.set_name]="TRAIN"# src = src[self.schema.get_all_columns()]run_kwargs={}iflen(self.symbols)==1:run_kwargs=dict(symbols=self.symbols)df=otp.run(src,# apply_times_daily=self.apply_times_daily,symbol_date=self.end,timezone=self.timezone,# the minute bar for 9:30-9:31 has the timestamp of 9:31start=self.start,end=self.end,**run_kwargs)returndfdefmerge_symbols(self,src):iflen(self.symbols)>1:src=otp.merge([src],symbols=self.symbols,identify_input_ts=True)src.drop(columns=['TICK_TYPE'],inplace=True)eliflen(self.symbols)==1:src['SYMBOL_NAME']=self.symbols[0]returnsrc
[docs]defload(self):""" Main method used to load data. Returns ---------- result: pd.DataFrame Loaded data """self.schema.symbols=self.symbolsself.schema.db=self.dbself.schema.tick_type=self.tick_typesrc=self.get_source()# set schema datareturnsrc
[docs]classOneTickBarsDatafeedOT(BaseOnetickLoader):""" OneTick datafeed with bars (Open, High, Low, Close, Volume, Trade Count). Parameters ---------- db : str Name for database to use. Default: 'NYSE_TAQ_BARS'. tick_type: str Tick type to load. Default: 'TRD_1M'. symbols: List[str] List of symbols to load. Default: ['AAPL']. start: otp.datetime Start datetime. Default: `datetime(2022, 3, 1, 9, 30)` end: otp.datetime End datetime. Default: `datetime(2022, 3, 10, 16, 0)` bucket: int Bucket size used to aggregate data (timeframe). Default: 600. bucket_time: str Bucket time to use: `start` or `end`. Default: `start`. timezone: str Timezone to use. Default: 'EST5EDT'. columns: list List of columns to load. apply_times_daily: bool Apply times daily to the data, skipping data outside of the specified times for all days. Default: True. """def__init__(self,**kwargs):defaults=dict(db='NYSE_TAQ_BARS',tick_type='TRD_1M',symbols=['AAPL'],start=otp.dt(2022,3,1,9,30),end=otp.dt(2022,3,10,16,0),bucket=600,bucket_time="start",timezone='EST5EDT',apply_times_daily=True,columns=['Time','SYMBOL_NAME','OPEN','HIGH','LOW','CLOSE','TRADE_COUNT','VOLUME'])defaults.update(kwargs)super().__init__(**defaults)
[docs]defget_source(self):data=otp.DataSource(db=self.db,tick_type=self.tick_type,)data["VOLUME"]=data["VOLUME"].apply(float)data,_=data[data['TRADE_TICK_COUNT']>0]# aggregate data by bucket_intervaldata=data.agg({'OPEN':otp.agg.first(data['FIRST']),'HIGH':otp.agg.max(data['HIGH']),'LOW':otp.agg.min(data['LOW']),'CLOSE':otp.agg.last(data['LAST']),'VOLUME':otp.agg.sum(data['VOLUME']),'TRADE_COUNT':otp.agg.sum(data['TRADE_TICK_COUNT'])},bucket_interval=self.bucket,bucket_time=self.bucket_time,)# apply values adjustments (splits, dividends, etc.)data=otp.functions.corp_actions(data,adjustment_date=int(self.end.strftime('%Y%m%d')),adjustment_date_tz="GMT",adjust_rule='SIZE',fields='VOLUME')data=otp.functions.corp_actions(data,adjustment_date=int(self.end.strftime('%Y%m%d')),adjustment_date_tz="GMT",adjust_rule='PRICE',fields='OPEN,HIGH,LOW,CLOSE')# filter out data outside of the specified timesifself.apply_times_daily:data=data.time_filter(start_time=self.start.strftime('%H%M%S%f')[:-3],end_time=self.end.strftime('%H%M%S%f')[:-3],timezone=self.timezone)# mark VOLUME as nan for empty bars (needed for filtering after lags)# TODO Use holiday calendar: pip install exchange_calendarsempty,data=data[(data["VOLUME"]==0)&(data["HIGH"]==otp.nan)]empty["VOLUME"]=otp.nandata=otp.merge([empty,data])data,_=data[data["VOLUME"]!=otp.nan]returndata
[docs]deftransform_ot(self,src:otp.Source):""" Calculates rolling window function for the given columns. Parameters ---------- src: otp.Source Source to calulate rolling window function for. columns : list List of columns to calculate rolling window function for. Returns ------- otp.Source Source with calculated rolling window function in a new columns. """forcolinself.column_names(src):new_column=f'{col}{self.suffix}{self.window_function.upper()}_{self.window_size}'agg_function=getattr(otp.agg,self.window_function)agg_dict={}agg_dict[new_column]=agg_function(col)src=src.agg(agg_dict,running=True,all_fields=True,bucket_interval=self.window_size,bucket_units='ticks')returnsrc
[docs]classOIDSymbolOT(BasePreprocess):""" Adds OID column based on symbol name. """deftransform_ot(self,src:otp.Source):# how other way we could avoid of db name in symbols in otp.run()?symbology=otp.SymbologyMapping(dest_symbology="OID",tick_type=self.schema.db+"::ANY")src=otp.join(src,symbology,on="all")src.rename(columns={"MAPPED_SYMBOL_NAME":"OID"},inplace=True)src["OID"]=src["OID"].apply(int)returnsrc
classExpressionOperator(BasePipelineOperator):def__init__(self,expression,new_column_name:str,inverse_expression=None,apply_kwargs:bool=None):super().__init__(expression=expression,inverse_expression=inverse_expression,apply_kwargs=apply_kwargs,new_column_name=new_column_name)self.expression=expressionself.inverse_expression=inverse_expressionself.new_column_name=new_column_nameself.apply_kwargs=apply_kwargsifself.apply_kwargsisNone:self.apply_kwargs={}self.columns=[]# to avoid of adding columns to schema by parent classdeftransform_ot(self,src:otp.Source):src[self.new_column_name]=src.apply(self.expression,**self.apply_kwargs)returnsrcdeftransform_pandas(self,df:pd.DataFrame):kwargs={'axis':1}kwargs.update(self.apply_kwargs)df[self.new_column_name]=df.apply(self.expression,**kwargs)returndfdeftransform(self,src:Union[pd.DataFrame,otp.Source]):returnsuper().transform(src)defsave_init_params(self,params,no_class=False):params['expression']=cloudpickle.dumps(params['expression'])returnsuper().save_init_params(params,no_class=no_class)definverse_transform(self,prediction_df:pd.DataFrame):ifnotself.inverse_expression:returnprediction_dfkwargs={'axis':1}kwargs.update(self.apply_kwargs)prediction_df[self.new_column_name]=prediction_df.apply(self.inverse_expression,**kwargs)returnprediction_df@classmethoddefrestore_instance(cls,params):params['expression']=cloudpickle.loads(params['expression'])returnsuper().restore_instance(params)classToPandas(BasePipelineOperator):pass