import numpy as np
from numpy.random import normal
import pandas as pd
from onetick.ml.interfaces import BaseDatafeed, BaseFeatures, BaseSplitting
from sklearn.model_selection import train_test_split
from datetime import datetime
import random
from typing import Optional, Union
import onetick.py as otp
from onetick.ml.interfaces.data_pipelines import BaseFilter, BasePipelineOperator, BasePreprocess
[docs]class RandomBarsDatafeed(BaseDatafeed):
"""
Datafeed with randomly generated bars (Open, High, Low, Close, Volume, Time).
Parameters
----------
bars : int
Number of bars to generate for each ticker.
Default: 5000.
bucket: int
Timeframe of bars to be used (specified in seconds).
Default: 600
candle_body_baseval: float
Parameter that determines the average body size of candlestick bars.
Default: 0.01
candle_shadow_baseval: float
Parameter that determines the average upper/lower shadow size of candlestick bars.
Default: 0.0025
random_state: int
Random state (seed).
Default None.
group_column: str, optional
Column name to be used for grouping (tickers, etc.)
Default None.
"""
def __init__(self, bars=5000,
bucket=600,
candle_body_baseval=0.01,
candle_shadow_baseval=0.005,
random_state=None,
group_column=None
):
super().__init__(bars=bars,
bucket=bucket,
candle_body_baseval=candle_body_baseval,
candle_shadow_baseval=candle_shadow_baseval,
random_state=random_state,
group_column=group_column,
)
self.bars = bars
self.bucket = bucket
self.candle_body_baseval = candle_body_baseval
self.candle_shadow_baseval = candle_shadow_baseval
self.random_state = random_state
self.group_column = group_column
def load(self):
start_timestamp = 1546286400
start_price = 50.00
min_deltas = (-0.01, 0.00, 0.01)
mean_volume = 5000
if self.group_column:
tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOG', 'FB']
else:
tickers = ['AAPL']
close_price = start_price
volume = mean_volume
res = {
'OPEN': [],
'HIGH': [],
'LOW': [],
'CLOSE': [],
'VOLUME': [],
'Time': [],
}
if self.group_column:
res[self.group_column] = []
random.seed(self.random_state)
np.random.seed(self.random_state)
for ticker in tickers:
for i in range(self.bars):
date = datetime.fromtimestamp(start_timestamp + i * self.bucket)
open_price = close_price + random.choice(min_deltas)
sign_price = 1 if random.random() >= open_price / (start_price * 2) else -1
candle_body_change = abs(self.candle_body_baseval * normal(0, 1))
candle_up_shadow_change = abs(self.candle_shadow_baseval * normal(0, 1))
candle_down_shadow_change = abs(self.candle_shadow_baseval * normal(0, 1))
close_price = open_price * (1 + sign_price * candle_body_change)
high_price = max(open_price, close_price) * (1 + candle_up_shadow_change)
low_price = min(open_price, close_price) * (1 - candle_down_shadow_change)
hl_change = high_price / low_price - 1
hl_mean_change = self.candle_body_baseval / self.candle_shadow_baseval / 100
hl_relative_cur = hl_change / hl_mean_change
vol_relative_prev = volume / (mean_volume * 2)
if hl_change > hl_mean_change:
sign_volume = 1 if random.random() * hl_relative_cur >= vol_relative_prev else -0.5
else:
sign_volume = 0.5 if random.random() * hl_relative_cur >= vol_relative_prev else -1
# todo: use only -1 and 1 for sigh_volume, but make volume increase in proportion to hl_relative_cur and
# volume decrease in proportion to 1/hl_relative_cur
volume = volume * (1 + 9 * sign_volume * hl_change)
res['OPEN'].append(round(open_price, 2))
res['HIGH'].append(round(high_price, 2))
res['LOW'].append(round(low_price, 2))
res['CLOSE'].append(round(close_price, 2))
res['VOLUME'].append(round(volume))
res['Time'].append(str(date))
if self.group_column:
res[self.group_column].append(ticker)
df = pd.DataFrame(res).set_index('Time')
self.schema.context = "pandas"
return df
[docs]class CSVDatafeed(BaseDatafeed):
"""
Datafeed class for loading data from CSV.
Parameters
----------
params : dict
Arguments goes directly to `pd.read_csv()` function.
suffix : str
Add a suffix to all columns of the result dataframe.
Default: None
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.suffix = kwargs.pop('suffix', None)
self.params = dict(kwargs)
def load(self):
df = pd.read_csv(**self.params) if self.params else None
if self.suffix:
df = df.add_suffix(str(self.suffix))
self.schema.context = "pandas"
return df
[docs]class OneTickBarsDatafeed(BaseDatafeed):
"""
OneTick datafeed with bars (Open, High, Low, Close, Volume, Trade Count).
Parameters
----------
db : str
Name for database to use.
Default: 'NYSE_TAQ_BARS'.
tick_type: str
Tick type to load.
Default: 'TRD_1M'.
symbols: List[str]
List of symbols to load.
Default: ['AAPL'].
start: otp.datetime
Start datetime.
Default: `datetime(2022, 3, 1, 9, 30)`
end: otp.datetime
End datetime.
Default: `datetime(2022, 3, 10, 16, 0)`
bucket: int
Bucket size used to aggregate data (timeframe).
Default: 600.
bucket_time: str
Bucket time to use: `start` or `end`.
Default: `start`.
timezone: str
Timezone to use.
Default: 'EST5EDT'.
suffix : str
Add a suffix to all columns of the result dataframe.
Default: None.
apply_times_daily: bool
Apply times daily to the data, skipping data outside of the specified times for all days.
Default: True.
"""
def __init__(self, **kwargs):
defaults = dict(db='NYSE_TAQ_BARS',
tick_type='TRD_1M',
symbols=['AAPL'],
start=otp.dt(2022, 3, 1, 9, 30),
end=otp.dt(2022, 3, 10, 16, 0),
bucket=600,
bucket_time="start",
timezone='EST5EDT',
apply_times_daily=True)
defaults.update(kwargs)
super().__init__(**defaults)
self.db = defaults.get('db')
self.tick_type = defaults.get('tick_type')
self.symbols = defaults.get('symbols')
self.start = defaults.get('start')
self.end = defaults.get('end')
self.bucket = defaults.get('bucket')
self.timezone = defaults.get('timezone')
self.bucket_time = defaults.get('bucket_time')
self.suffix = defaults.get('suffix', None)
self.apply_times_daily = defaults.get('apply_times_daily')
[docs] def load(self, *args):
"""
Main method used to load data.
Returns
----------
result: pd.DataFrame
Loaded data
"""
data = otp.DataSource(db=self.db,
tick_type=self.tick_type)
data["VOLUME"] = data["VOLUME"].apply(float)
data, _ = data[data['TRADE_TICK_COUNT'] > 0]
data = data.agg({'OPEN': otp.agg.first(data['FIRST']),
'HIGH': otp.agg.max(data['HIGH']),
'LOW': otp.agg.min(data['LOW']),
'CLOSE': otp.agg.last(data['LAST']),
'VOLUME': otp.agg.sum(data['VOLUME']),
'TRADE_COUNT': otp.agg.sum(data['TRADE_TICK_COUNT'])},
bucket_time=self.bucket_time,
bucket_interval=self.bucket)
data = otp.functions.corp_actions(data,
adjustment_date=int(self.end.strftime('%Y%m%d')),
adjustment_date_tz="GMT",
adjust_rule='SIZE',
fields='VOLUME')
data = otp.functions.corp_actions(data,
adjustment_date=int(self.end.strftime('%Y%m%d')),
adjustment_date_tz="GMT",
adjust_rule='PRICE',
fields='OPEN,HIGH,LOW,CLOSE')
# filter out data outside of the specified times
if self.apply_times_daily:
data = data.time_filter(start_time=self.start.strftime('%H%M%S%f')[:-3],
end_time=self.end.strftime('%H%M%S%f')[:-3],
timezone=self.timezone)
merged_data = otp.merge([data], symbols=self.symbols, identify_input_ts=True)
merged_data, _ = merged_data[merged_data['TRADE_COUNT'] > 0]
merged_data = merged_data[['OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOLUME', 'TRADE_COUNT', 'SYMBOL_NAME']]
df = otp.run(merged_data,
# apply_times_daily=self.apply_times_daily,
start=self.start,
# the minute bar for 9:30-9:31 has the timestamp of 9:31
end=self.end,
symbol_date=self.end,
# the minute bar for 3:59-4:00 has the timestamp of 4:00 but the end timestamp is not included
timezone=self.timezone)
df = df.set_index('Time')
if self.suffix:
df = df.add_suffix(str(self.suffix))
self.schema.context = "pandas"
return df
[docs]class SelectFeatures(BasePipelineOperator):
"""
Class that selects the specified columns as features.
Parameters
----------
columns : list
List of columns that are defined as features.
override : bool
If True, override the existing list of features columns.
If false, adds the columns to the existing list of features columns.
"""
def __init__(self, columns=None, override=False):
if columns is None:
raise ValueError("columns must be specified for using SelectFeatures")
super().__init__(columns=columns, override=override)
self.override = override
def transform_pandas(self, df: pd.DataFrame):
for col in self.column_names(df):
if col not in df.columns:
raise ValueError(f"Column {col} is not in the dataframe")
if self.override:
self.schema.features_columns = self.column_names(df)
else:
self.schema.add_features(self.column_names(df))
return df
def transform_ot(self, src: otp.Source):
for col in self.column_names(src):
if col not in src.schema:
raise ValueError(f"Column {col} is not in the otp.Source schema")
if self.override:
self.schema.features_columns = self.column_names(src)
else:
self.schema.add_features(self.column_names(src))
return src
[docs]class SelectTargets(BasePipelineOperator):
"""
Class that selects the specified columns as targets.
Parameters
----------
columns : list
List of columns that are defined as targets.
override : bool
If True, override the existing list of targets columns.
If False, adds the columns to the existing list of targets columns.
shift : bool
DONT USE IT YET!
If True, shift the resulted target columns by one row,
providing the **next** value of the target column for each row.
It is useful for the case when we want to predict the next value of a time series,
and we want to use the current value as a feature.
Default is False.
"""
def __init__(self, columns, override: bool = False, shift: bool = False):
super().__init__(columns=columns,
override=override,
shift=shift)
self.override = override
self.shift = shift
def add_targets(self, src):
columns_list = self.column_names(src)
for col in columns_list:
if col in self.schema.features_columns and not self.shift:
raise ValueError(f"Column {col} is already defined as a feature, "
"it is prohibited to define it as a target in order to avoid data leakage. "
"Set shift=True to shift the target column by one row.")
if f"__UNPROCESSED.{col}" not in self.schema.unprocessed_target_columns:
self.schema.unprocessed_target_columns += [f"__UNPROCESSED.{col}"]
if self.override:
self.schema.target_columns = columns_list
else:
self.schema.add_targets(columns_list)
def transform_pandas(self, df: pd.DataFrame):
for col in self.column_names(df):
if col not in df.columns:
raise ValueError(f"Column {col} is not in the dataframe")
if self.shift:
df[f"__UNPROCESSED.{col}"] = df[col].shift(-1)
# drop the last row, as it contains NaN in target column
# and also it contains data leakage from the future
df.drop(df.index.max(), inplace=True)
else:
df[f"__UNPROCESSED.{col}"] = df[col]
self.add_targets(df)
return df
def transform_ot(self, src: otp.Source):
for col in self.column_names(src):
if col not in src.schema:
raise ValueError(f"Column {col} is not in the otp.Source schema")
target_name = f"__UNPROCESSED.{col}"
if self.shift:
if target_name in src.schema:
src.drop(target_name, inplace=True)
src[target_name] = src[col][1]
agg = src.agg({"__TOTAL": otp.agg.count()})
src = otp.join(src, agg, on="all")
# drop the last row, as it contains NaN in target column
src.state_vars["__COUNT"] = 0
src.state_vars["__COUNT"] = src.state_vars["__COUNT"] + 1
src, _ = src[src.state_vars["__COUNT"] < src["__TOTAL"]]
src.drop("__TOTAL", inplace=True)
else:
src[target_name] = src[col]
self.add_targets(src)
return src
[docs]class CalcLags(BaseFeatures):
"""
Class that calculates lag values for selected columns.
Note: this feature processing will remove the first `max(periods)` rows from the dataframe,
as there is no lagged value for them.
Parameters
----------
columns : list
List of columns for which lags are calculated.
periods : list, optional
Values of the lag to be used.
Maximum value of `periods` is used to remove the first rows from the dataframe.
Default: [1]
suffix: str, optional
The names of columns with calculated lag values have a suffix of the form: `f"{suffix}{value_of_lag}"`
Default: "_LAG_"
remove_first_rows: bool, optional
If True, remove the first `max(periods)` rows from the dataframe.
Default: False
"""
def __init__(self,
columns: Optional[list],
periods: Optional[list] = None,
remove_first_rows: bool = False,
group_by_column: bool = False,
suffix: str = '_LAG_'):
self.periods = [1] if periods is None else periods
super().__init__(columns=columns,
periods=self.periods,
remove_first_rows=remove_first_rows,
suffix=suffix,
group_by_column=group_by_column)
self.suffix = suffix
self.remove_first_rows = remove_first_rows
self.group_by_column = group_by_column
def transform_pandas(self, df: pd.DataFrame):
calculated_columns = []
group_column_name = self.schema.group_column_name
if group_column_name:
if group_column_name not in df.columns:
raise ValueError(f"column with groups `{group_column_name}` not in the input DataFrame")
grouped_df = df.groupby(group_column_name)
for column in self.column_names(df):
if column not in df.columns:
raise ValueError(f"column `{column}` not in the input DataFrame")
for lag in self.periods:
feature_col_name = f'{column}{self.suffix}{lag}'
if group_column_name:
df[feature_col_name] = grouped_df[column].shift(lag)
else:
df[feature_col_name] = df[column].shift(lag)
calculated_columns.append(feature_col_name)
if self.remove_first_rows:
# remove first periods rows, as there is no lagged value for them
if self.group_by_column:
# remove the first `max(periods)` rows from each group
df.drop(grouped_df.head(max(self.periods)).index, inplace=True)
else:
# remove the first `max(periods)` rows
df.drop(df[:max(self.periods)].index, inplace=True)
return df
[docs]class PercentageSplitter(BaseSplitting):
"""
Class for splitting data to X (features) and Y (target) sets, as well as to train-test-validate subsets
(samples are determined using percentage size for validation and test subsets).
Parameters
----------
val_size: float
The size of the validation subset.
Default: 0
test_size : float
The size of the test subset.
Default: 0.15
shuffle : bool
Whether or not to shuffle the data before splitting.
Default: False
"""
def __init__(self, val_size=0, test_size=0.15, shuffle=False):
super().__init__(shuffle=shuffle, val_size=val_size, test_size=test_size)
self.shuffle = shuffle
self.test_size = test_size
self.val_size = val_size
def transform_pandas(self, df: pd.DataFrame):
if self.test_size > 0:
train_val, test = train_test_split(df.index, test_size=self.test_size, shuffle=self.shuffle)
else:
train_val = df.index
test = []
if self.val_size > 0:
train, val = train_test_split(train_val, test_size=self.val_size, shuffle=self.shuffle)
else:
val = []
df["__SPLIT"] = "TRAIN"
df.loc[test, "__SPLIT"] = "TEST"
df.loc[val, "__SPLIT"] = "VAL"
return df
[docs]class IndexSplitter(BaseSplitting):
"""
Class for splitting data into X (features) and Y (target) sets, as well as into train-test-validate subsets
(samples are determined using indexes for validation and test subsets).
Parameters
----------
val_indexes: list
The indexes of the validation subset.
Default: []
test_indexes : list
The indexes of the test subset.
Default: []
"""
def __init__(self, val_indexes=None, test_indexes=None):
super().__init__(val_indexes=val_indexes, test_indexes=test_indexes)
self.test_indexes = [] if test_indexes is None else test_indexes
self.val_indexes = [] if val_indexes is None else val_indexes
def transform_pandas(self, df: pd.DataFrame):
val_test_indexes = self.val_indexes + self.test_indexes
train_indexes = df[~df.index.isin(val_test_indexes)].index
# convert list to set, because search in the set is O(1) which is beneficial when working with big data
val_set = frozenset(self.val_indexes)
val_indexes = [x for x in list(df.index) if x in val_set]
test_set = frozenset(self.test_indexes)
test_indexes = [x for x in list(df.index) if x in test_set]
df["__SPLIT"] = ""
df.loc[train_indexes, "__SPLIT"] = "TRAIN"
df.loc[test_indexes, "__SPLIT"] = "TEST"
df.loc[val_indexes, "__SPLIT"] = "VAL"
return df
[docs]class TimeSplitter(BaseSplitting):
"""
Class for splitting data into X (features) and Y (target) sets, as well as into train-test-validate subsets
(samples are determined using time ranges for validation and test subsets).
Parameters
----------
datetime_column: str
Name of the column that stores the datetime values. Set '' for selecting index column.
Default: 'Time'
val_time_range: tuple
Tuple of the start and end datetimes for the validation subset.
Start time, end time are included in the range.
Default: ()
test_time_range : tuple
Tuple of the start and end datetimes for the test subset.
Start time, end time are included in the range.
Default: ()
"""
def __init__(self, datetime_column='Time', val_time_range=(), test_time_range=()):
super().__init__(datetime_column=datetime_column,
val_time_range=val_time_range, test_time_range=test_time_range)
self.datetime_column = datetime_column
self.val_time_range = val_time_range
self.test_time_range = test_time_range
def transform_ot(self, src: otp.Source) -> otp.Source:
src['__SPLIT'] = src["Time"].apply(
lambda x:
"TEST" if x >= self.test_time_range[0] and x < self.test_time_range[1] else
"VAL" if x >= self.val_time_range[0] and x < self.val_time_range[1] else
"TRAIN"
)
return src
def transform_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
# define datetime_series again so as not to save the series to memory as instance field,
# and also to be able to independently call the split function
if self.datetime_column:
assert self.datetime_column in df.columns, f"column {self.datetime_column} not in input DataFrame"
datetime_series = df[self.datetime_column]
else:
datetime_series = df.index
if self.val_time_range and self.val_time_range[0] and self.val_time_range[1]:
val_indexes = df.loc[((datetime_series >= self.val_time_range[0]) &
(datetime_series < self.val_time_range[1]))].index
else:
val_indexes = []
if self.test_time_range and self.test_time_range[0] and self.test_time_range[1]:
test_indexes = df.loc[((datetime_series >= self.test_time_range[0]) &
(datetime_series < self.test_time_range[1]))].index
else:
test_indexes = []
val_test_indexes = list(val_indexes) + list(test_indexes)
train_indexes = df[~df.index.isin(val_test_indexes)].index
# convert list to set, because search in the set is O(1) which is beneficial when working with big data
val_set = frozenset(val_indexes)
val_indexes = [x for x in list(df.index) if x in val_set]
test_set = frozenset(test_indexes)
test_indexes = [x for x in list(df.index) if x in test_set]
df["__SPLIT"] = ""
df.loc[train_indexes, "__SPLIT"] = "TRAIN"
df.loc[test_indexes, "__SPLIT"] = "TEST"
df.loc[val_indexes, "__SPLIT"] = "VAL"
return df
[docs]class FilterValues(BaseFilter):
def __init__(self,
columns: Union[list, str] = "__all__",
exclude_values: Union[list, str] = 'na',
exclude_from_test_set_only: bool = False):
super().__init__(columns=columns,
exclude_values=exclude_values,
exclude_from_test_set_only=exclude_from_test_set_only,)
self.exclude_values = exclude_values
self.exclude_from_test_set_only = exclude_from_test_set_only
if isinstance(self.exclude_values, str):
self.exclude_values = [self.exclude_values]
def transform_ot(self, src: otp.Source):
for col in self.column_names(src):
for value in self.exclude_values:
if value == 'zeros':
src, _ = src[src[col] != 0]
elif value == 'nan' or value == 'na':
src, _ = src[src[col] != otp.nan]
elif value == 'negatives':
src, _ = src[src[col] >= 0]
return src
def transform_pandas(self, df: pd.DataFrame):
operator_map = {'zeros': lambda c: c == 0,
'negatives': lambda c: c < 0,
'nan': lambda c: c.isna(),
'na': lambda c: c.isna(),
}
for exclude_value in self.exclude_values:
_operator = operator_map[exclude_value]
for column in self.column_names(df):
if self.exclude_from_test_set_only:
# TODO hardcoded column name __SPLIT, should be a schema based value
# TODO hardcoded value TEST, should be a constant
drop_indexes = _operator(df[df["__SPLIT"] == "TEST"][column])
else:
drop_indexes = _operator(df[column])
df.drop(drop_indexes[drop_indexes].index, inplace=True)
return df