Source code for dsframework.impl.data_pipelines.data_pipeline

import numpy as np
from numpy.random import normal
import pandas as pd
from dsframework.interfaces import BaseDatafeed, BaseFeatures, BaseSplitting
from sklearn.model_selection import train_test_split
from datetime import datetime
import random
from typing import Optional, Union
import onetick.py as otp

from dsframework.interfaces.data_pipelines import BaseFilter, BasePipelineOperator, BasePreprocess


[docs]class RandomBarsDatafeed(BaseDatafeed): """ Datafeed with randomly generated bars (Open, High, Low, Close, Volume, Time). Parameters ---------- bars : int Number of bars to generate for each ticker. Default: 5000. bucket: int Timeframe of bars to be used (specified in seconds). Default: 600 candle_body_baseval: float Parameter that determines the average body size of candlestick bars. Default: 0.01 candle_shadow_baseval: float Parameter that determines the average upper/lower shadow size of candlestick bars. Default: 0.0025 random_state: int Random state (seed). Default None. group_column: str, optional Column name to be used for grouping (tickers, etc.) Default None. """ def __init__(self, bars=5000, bucket=600, candle_body_baseval=0.01, candle_shadow_baseval=0.005, random_state=None, group_column=None ): super().__init__(bars=bars, bucket=bucket, candle_body_baseval=candle_body_baseval, candle_shadow_baseval=candle_shadow_baseval, random_state=random_state, group_column=group_column, ) self.bars = bars self.bucket = bucket self.candle_body_baseval = candle_body_baseval self.candle_shadow_baseval = candle_shadow_baseval self.random_state = random_state self.group_column = group_column def load(self): start_timestamp = 1546286400 start_price = 50.00 min_deltas = (-0.01, 0.00, 0.01) mean_volume = 5000 if self.group_column: tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOG', 'FB'] else: tickers = ['AAPL'] close_price = start_price volume = mean_volume res = { 'OPEN': [], 'HIGH': [], 'LOW': [], 'CLOSE': [], 'VOLUME': [], 'Time': [], } if self.group_column: res[self.group_column] = [] random.seed(self.random_state) np.random.seed(self.random_state) for ticker in tickers: for i in range(self.bars): date = datetime.fromtimestamp(start_timestamp + i * self.bucket) open_price = close_price + random.choice(min_deltas) sign_price = 1 if random.random() >= open_price / (start_price * 2) else -1 candle_body_change = abs(self.candle_body_baseval * normal(0, 1)) candle_up_shadow_change = abs(self.candle_shadow_baseval * normal(0, 1)) candle_down_shadow_change = abs(self.candle_shadow_baseval * normal(0, 1)) close_price = open_price * (1 + sign_price * candle_body_change) high_price = max(open_price, close_price) * (1 + candle_up_shadow_change) low_price = min(open_price, close_price) * (1 - candle_down_shadow_change) hl_change = high_price / low_price - 1 hl_mean_change = self.candle_body_baseval / self.candle_shadow_baseval / 100 hl_relative_cur = hl_change / hl_mean_change vol_relative_prev = volume / (mean_volume * 2) if hl_change > hl_mean_change: sign_volume = 1 if random.random() * hl_relative_cur >= vol_relative_prev else -0.5 else: sign_volume = 0.5 if random.random() * hl_relative_cur >= vol_relative_prev else -1 # todo: use only -1 and 1 for sigh_volume, but make volume increase in proportion to hl_relative_cur and # volume decrease in proportion to 1/hl_relative_cur volume = volume * (1 + 9 * sign_volume * hl_change) res['OPEN'].append(round(open_price, 2)) res['HIGH'].append(round(high_price, 2)) res['LOW'].append(round(low_price, 2)) res['CLOSE'].append(round(close_price, 2)) res['VOLUME'].append(round(volume)) res['Time'].append(str(date)) if self.group_column: res[self.group_column].append(ticker) df = pd.DataFrame(res).set_index('Time') self.schema.context = "pandas" return df
[docs]class CSVDatafeed(BaseDatafeed): """ Datafeed class for loading data from CSV. Parameters ---------- params : dict Arguments goes directly to `pd.read_csv()` function. suffix : str Add a suffix to all columns of the result dataframe. Default: None """ def __init__(self, **kwargs): super().__init__(**kwargs) self.suffix = kwargs.pop('suffix', None) self.params = dict(kwargs) def load(self): df = pd.read_csv(**self.params) if self.params else None if self.suffix: df = df.add_suffix(str(self.suffix)) self.schema.context = "pandas" return df
[docs]class OneTickBarsDatafeed(BaseDatafeed): """ OneTick datafeed with bars (Open, High, Low, Close, Volume, Trade Count). Parameters ---------- db : str Name for database to use. Default: 'NYSE_TAQ_BARS'. tick_type: str Tick type to load. Default: 'TRD_1M'. symbols: List[str] List of symbols to load. Default: ['AAPL']. start: otp.datetime Start datetime. Default: `datetime(2022, 3, 1, 9, 30)` end: otp.datetime End datetime. Default: `datetime(2022, 3, 10, 16, 0)` bucket: int Bucket size used to aggregate data (timeframe). Default: 600. bucket_time: str Bucket time to use: `start` or `end`. Default: `start`. timezone: str Timezone to use. Default: 'EST5EDT'. suffix : str Add a suffix to all columns of the result dataframe. Default: None. apply_times_daily: bool Apply times daily to the data, skipping data outside of the specified times for all days. Default: True. """ def __init__(self, **kwargs): defaults = dict(db='NYSE_TAQ_BARS', tick_type='TRD_1M', symbols=['AAPL'], start=otp.dt(2022, 3, 1, 9, 30), end=otp.dt(2022, 3, 10, 16, 0), bucket=600, bucket_time="start", timezone='EST5EDT', apply_times_daily=True) defaults.update(kwargs) super().__init__(**defaults) self.db = defaults.get('db') self.tick_type = defaults.get('tick_type') self.symbols = defaults.get('symbols') self.start = defaults.get('start') self.end = defaults.get('end') self.bucket = defaults.get('bucket') self.timezone = defaults.get('timezone') self.bucket_time = defaults.get('bucket_time') self.suffix = defaults.get('suffix', None) self.apply_times_daily = defaults.get('apply_times_daily')
[docs] def load(self, *args): """ Main method used to load data. Returns ---------- result: pd.DataFrame Loaded data """ data = otp.DataSource(db=self.db, tick_type=self.tick_type) data["VOLUME"] = data["VOLUME"].apply(float) data, _ = data[data['TRADE_TICK_COUNT'] > 0] data = data.agg({'OPEN': otp.agg.first(data['FIRST']), 'HIGH': otp.agg.max(data['HIGH']), 'LOW': otp.agg.min(data['LOW']), 'CLOSE': otp.agg.last(data['LAST']), 'VOLUME': otp.agg.sum(data['VOLUME']), 'TRADE_COUNT': otp.agg.sum(data['TRADE_TICK_COUNT'])}, bucket_time=self.bucket_time, bucket_interval=self.bucket) data = otp.functions.corp_actions(data, adjustment_date=int(self.end.strftime('%Y%m%d')), adjust_rule='SIZE', fields='VOLUME') data = otp.functions.corp_actions(data, adjustment_date=int(self.end.strftime('%Y%m%d')), adjust_rule='PRICE', fields='OPEN,HIGH,LOW,CLOSE') # filter out data outside of the specified times if self.apply_times_daily: data = data.time_filter(start_time=self.start.strftime('%H%M%S%f')[:-3], end_time=self.end.strftime('%H%M%S%f')[:-3], timezone=self.timezone) merged_data = otp.merge([data], symbols=self.symbols, identify_input_ts=True) merged_data, _ = merged_data[merged_data['TRADE_COUNT'] > 0] merged_data = merged_data[['Time', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOLUME', 'TRADE_COUNT', 'SYMBOL_NAME']] df = otp.run(merged_data, # apply_times_daily=self.apply_times_daily, start=self.start, # the minute bar for 9:30-9:31 has the timestamp of 9:31 end=self.end, symbol_date=self.end, # the minute bar for 3:59-4:00 has the timestamp of 4:00 but the end timestamp is not included timezone=self.timezone) df = df.set_index('Time') if self.suffix: df = df.add_suffix(str(self.suffix)) self.schema.context = "pandas" return df
[docs]class SelectFeatures(BasePipelineOperator): """ Class that selects the specified columns as features. Parameters ---------- columns : list List of columns that are defined as features. override : bool If True, override the existing list of features columns. If false, adds the columns to the existing list of features columns. """ def __init__(self, columns=None, override=False): if columns is None: raise ValueError("columns must be specified for using SelectFeatures") super().__init__(columns=columns, override=override) self.override = override def transform_pandas(self, df: pd.DataFrame): for col in self.column_names(df): if col not in df.columns: raise ValueError(f"Column {col} is not in the dataframe") if self.override: self.schema.features_columns = self.column_names(df) else: self.schema.add_features(self.column_names(df)) return df def transform_ot(self, src: otp.Source): for col in self.column_names(src): if col not in src.schema: raise ValueError(f"Column {col} is not in the otp.Source schema") if self.override: self.schema.features_columns = self.column_names(src) else: self.schema.add_features(self.column_names(src)) return src
[docs]class SelectTargets(BasePipelineOperator): """ Class that selects the specified columns as targets. Parameters ---------- columns : list List of columns that are defined as targets. override : bool If True, override the existing list of targets columns. If False, adds the columns to the existing list of targets columns. shift : bool DONT USE IT YET! If True, shift the resulted target columns by one row, providing the **next** value of the target column for each row. It is useful for the case when we want to predict the next value of a time series, and we want to use the current value as a feature. Default is False. """ def __init__(self, columns, override: bool = False, shift: bool = False): super().__init__(columns=columns, override=override, shift=shift) self.override = override self.shift = shift def add_targets(self, src): columns_list = self.column_names(src) for col in columns_list: if col in self.schema.features_columns and not self.shift: raise ValueError(f"Column {col} is already defined as a feature, " "it is prohibited to define it as a target in order to avoid data leakage. " "Set shift=True to shift the target column by one row.") if f"__UNPROCESSED.{col}" not in self.schema.unprocessed_target_columns: self.schema.unprocessed_target_columns += [f"__UNPROCESSED.{col}"] if self.override: self.schema.target_columns = columns_list else: self.schema.add_targets(columns_list) def transform_pandas(self, df: pd.DataFrame): for col in self.column_names(df): if col not in df.columns: raise ValueError(f"Column {col} is not in the dataframe") if self.shift: df[f"__UNPROCESSED.{col}"] = df[col].shift(-1) # drop the last row, as it contains NaN in target column # and also it contains data leakage from the future df.drop(df.index.max(), inplace=True) else: df[f"__UNPROCESSED.{col}"] = df[col] self.add_targets(df) return df def transform_ot(self, src: otp.Source): for col in self.column_names(src): if col not in src.schema: raise ValueError(f"Column {col} is not in the otp.Source schema") target_name = f"__UNPROCESSED.{col}" if self.shift: if target_name in src.schema: src.drop(target_name, inplace=True) src[target_name] = src[col][1] agg = src.agg({"__TOTAL": otp.agg.count()}) src = otp.join(src, agg, on="all") # drop the last row, as it contains NaN in target column src.state_vars["__COUNT"] = 0 src.state_vars["__COUNT"] = src.state_vars["__COUNT"] + 1 src, _ = src[src.state_vars["__COUNT"] < src["__TOTAL"]] src.drop("__TOTAL", inplace=True) else: src[target_name] = src[col] self.add_targets(src) return src
[docs]class CalcLags(BaseFeatures): """ Class that calculates lag values for selected columns. Note: this feature processing will remove the first `max(periods)` rows from the dataframe, as there is no lagged value for them. Parameters ---------- columns : list List of columns for which lags are calculated. periods : list, optional Values of the lag to be used. Maximum value of `periods` is used to remove the first rows from the dataframe. Default: [1] suffix: str, optional The names of columns with calculated lag values have a suffix of the form: `f"{suffix}{value_of_lag}"` Default: "_LAG_" remove_first_rows: bool, optional If True, remove the first `max(periods)` rows from the dataframe. Default: False """ def __init__(self, columns: Optional[list], periods: Optional[list] = None, remove_first_rows: bool = False, group_by_column: bool = False, suffix: str = '_LAG_'): self.periods = [1] if periods is None else periods super().__init__(columns=columns, periods=self.periods, remove_first_rows=remove_first_rows, suffix=suffix, group_by_column=group_by_column) self.suffix = suffix self.remove_first_rows = remove_first_rows self.group_by_column = group_by_column
[docs] def transform_ot(self, src: otp.Source) -> otp.Source: """ Calculates lags for the given columns inside OneTick pipeline. Parameters ---------- src: otp.Source Source to calculate lags for. columns : list List of columns to calculate lags for. Returns ------- otp.Source Source with calculated lags in a new columns. """ for col in self.column_names(src): for lag in self.periods: src[col] = src[col].astype(float) src[f'{col}{self.suffix}{lag}'] = src[col][-lag] if self.remove_first_rows: src = src.agg({"__ORDER_NUMBER": otp.agg.count()}, running=True, all_fields=True) src, _ = src[src["__ORDER_NUMBER"] > max(self.periods)] return src
def transform_pandas(self, df: pd.DataFrame): calculated_columns = [] group_column_name = self.schema.group_column_name if group_column_name: if group_column_name not in df.columns: raise ValueError(f"column with groups `{group_column_name}` not in the input DataFrame") grouped_df = df.groupby(group_column_name) for column in self.column_names(df): if column not in df.columns: raise ValueError(f"column `{column}` not in the input DataFrame") for lag in self.periods: feature_col_name = f'{column}{self.suffix}{lag}' if group_column_name: df[feature_col_name] = grouped_df[column].shift(lag) else: df[feature_col_name] = df[column].shift(lag) calculated_columns.append(feature_col_name) if self.remove_first_rows: # remove first periods rows, as there is no lagged value for them if self.group_by_column: # remove the first `max(periods)` rows from each group df.drop(grouped_df.head(max(self.periods)).index, inplace=True) else: # remove the first `max(periods)` rows df.drop(df[:max(self.periods)].index, inplace=True) return df
[docs]class PercentageSplitter(BaseSplitting): """ Class for splitting data to X (features) and Y (target) sets, as well as to train-test-validate subsets (samples are determined using percentage size for validation and test subsets). Parameters ---------- val_size: float The size of the validation subset. Default: 0 test_size : float The size of the test subset. Default: 0.15 shuffle : bool Whether or not to shuffle the data before splitting. Default: False """ def __init__(self, val_size=0, test_size=0.15, shuffle=False): super().__init__(shuffle=shuffle, val_size=val_size, test_size=test_size) self.shuffle = shuffle self.test_size = test_size self.val_size = val_size def transform_pandas(self, df: pd.DataFrame): if self.test_size > 0: train_val, test = train_test_split(df.index, test_size=self.test_size, shuffle=self.shuffle) else: train_val = df.index test = [] if self.val_size > 0: train, val = train_test_split(train_val, test_size=self.val_size, shuffle=self.shuffle) else: val = [] df["__SPLIT"] = "TRAIN" df.loc[test, "__SPLIT"] = "TEST" df.loc[val, "__SPLIT"] = "VAL" return df
[docs]class IndexSplitter(BaseSplitting): """ Class for splitting data into X (features) and Y (target) sets, as well as into train-test-validate subsets (samples are determined using indexes for validation and test subsets). Parameters ---------- val_indexes: list The indexes of the validation subset. Default: [] test_indexes : list The indexes of the test subset. Default: [] """ def __init__(self, val_indexes=None, test_indexes=None): super().__init__(val_indexes=val_indexes, test_indexes=test_indexes) self.test_indexes = [] if test_indexes is None else test_indexes self.val_indexes = [] if val_indexes is None else val_indexes def transform_pandas(self, df: pd.DataFrame): val_test_indexes = self.val_indexes + self.test_indexes train_indexes = df[~df.index.isin(val_test_indexes)].index # convert list to set, because search in the set is O(1) which is beneficial when working with big data val_set = frozenset(self.val_indexes) val_indexes = [x for x in list(df.index) if x in val_set] test_set = frozenset(self.test_indexes) test_indexes = [x for x in list(df.index) if x in test_set] df["__SPLIT"] = "" df.loc[train_indexes, "__SPLIT"] = "TRAIN" df.loc[test_indexes, "__SPLIT"] = "TEST" df.loc[val_indexes, "__SPLIT"] = "VAL" return df
[docs]class TimeSplitter(BaseSplitting): """ Class for splitting data into X (features) and Y (target) sets, as well as into train-test-validate subsets (samples are determined using time ranges for validation and test subsets). Parameters ---------- datetime_column: str Name of the column that stores the datetime values. Set '' for selecting index column. Default: 'Time' val_time_range: tuple Tuple of the start and end datetimes for the validation subset. Start time, end time are included in the range. Default: () test_time_range : tuple Tuple of the start and end datetimes for the test subset. Start time, end time are included in the range. Default: () """ def __init__(self, datetime_column='Time', val_time_range=(), test_time_range=()): super().__init__(datetime_column=datetime_column, val_time_range=val_time_range, test_time_range=test_time_range) self.datetime_column = datetime_column self.val_time_range = val_time_range self.test_time_range = test_time_range def transform_ot(self, src: otp.Source) -> otp.Source: src['__SPLIT'] = src["Time"].apply( lambda x: "TEST" if x >= self.test_time_range[0] and x < self.test_time_range[1] else "VAL" if x >= self.val_time_range[0] and x < self.val_time_range[1] else "TRAIN" ) return src def transform_pandas(self, df: pd.DataFrame) -> pd.DataFrame: # define datetime_series again so as not to save the series to memory as instance field, # and also to be able to independently call the split function if self.datetime_column: assert self.datetime_column in df.columns, f"column {self.datetime_column} not in input DataFrame" datetime_series = df[self.datetime_column] else: datetime_series = df.index if self.val_time_range and self.val_time_range[0] and self.val_time_range[1]: val_indexes = df.loc[((datetime_series >= self.val_time_range[0]) & (datetime_series < self.val_time_range[1]))].index else: val_indexes = [] if self.test_time_range and self.test_time_range[0] and self.test_time_range[1]: test_indexes = df.loc[((datetime_series >= self.test_time_range[0]) & (datetime_series < self.test_time_range[1]))].index else: test_indexes = [] val_test_indexes = list(val_indexes) + list(test_indexes) train_indexes = df[~df.index.isin(val_test_indexes)].index # convert list to set, because search in the set is O(1) which is beneficial when working with big data val_set = frozenset(val_indexes) val_indexes = [x for x in list(df.index) if x in val_set] test_set = frozenset(test_indexes) test_indexes = [x for x in list(df.index) if x in test_set] df["__SPLIT"] = "" df.loc[train_indexes, "__SPLIT"] = "TRAIN" df.loc[test_indexes, "__SPLIT"] = "TEST" df.loc[val_indexes, "__SPLIT"] = "VAL" return df
[docs]class FilterValues(BaseFilter): def __init__(self, columns: Union[list, str] = "__all__", exclude_values: Union[list, str] = 'na', exclude_from_test_set_only: bool = False): super().__init__(columns=columns, exclude_values=exclude_values, exclude_from_test_set_only=exclude_from_test_set_only,) self.exclude_values = exclude_values self.exclude_from_test_set_only = exclude_from_test_set_only if isinstance(self.exclude_values, str): self.exclude_values = [self.exclude_values] def transform_ot(self, src: otp.Source): for col in self.column_names(src): for value in self.exclude_values: if value == 'zeros': src, _ = src[src[col] != 0] elif value == 'nan' or value == 'na': src, _ = src[src[col] != otp.nan] elif value == 'negatives': src, _ = src[src[col] >= 0] return src def transform_pandas(self, df: pd.DataFrame): operator_map = {'zeros': lambda c: c == 0, 'negatives': lambda c: c < 0, 'nan': lambda c: c.isna(), 'na': lambda c: c.isna(), } for exclude_value in self.exclude_values: _operator = operator_map[exclude_value] for column in self.column_names(df): if self.exclude_from_test_set_only: # TODO hardcoded column name __SPLIT, should be a schema based value # TODO hardcoded value TEST, should be a constant drop_indexes = _operator(df[df["__SPLIT"] == "TEST"][column]) else: drop_indexes = _operator(df[column]) df.drop(drop_indexes[drop_indexes].index, inplace=True) return df