import numpy as np
from numpy.random import normal
import pandas as pd
from onetick.ml.interfaces import BaseDatafeed, BaseFeatures, BaseSplitting
from sklearn.model_selection import train_test_split
from datetime import datetime
import random
from typing import Optional, Union
import onetick.py as otp
from onetick.ml.interfaces.data_pipelines import BaseFilter, BasePipelineOperator, BasePreprocess
[docs]class RandomBarsDatafeed(BaseDatafeed):
    """
    Datafeed with randomly generated bars (Open, High, Low, Close, Volume, Time).
    Parameters
    ----------
    bars : int
        Number of bars to generate for each ticker.
        Default: 5000.
    bucket: int
        Timeframe of bars to be used (specified in seconds).
        Default: 600
    candle_body_baseval: float
        Parameter that determines the average body size of candlestick bars.
        Default: 0.01
    candle_shadow_baseval: float
        Parameter that determines the average upper/lower shadow size of candlestick bars.
        Default: 0.0025
    random_state: int
        Random state (seed).
        Default None.
    group_column: str, optional
        Column name to be used for grouping (tickers, etc.)
        Default None.
    """
    def __init__(self, bars=5000,
                 bucket=600,
                 candle_body_baseval=0.01,
                 candle_shadow_baseval=0.005,
                 random_state=None,
                 group_column=None
                 ):
        super().__init__(bars=bars,
                         bucket=bucket,
                         candle_body_baseval=candle_body_baseval,
                         candle_shadow_baseval=candle_shadow_baseval,
                         random_state=random_state,
                         group_column=group_column,
                         )
        self.bars = bars
        self.bucket = bucket
        self.candle_body_baseval = candle_body_baseval
        self.candle_shadow_baseval = candle_shadow_baseval
        self.random_state = random_state
        self.group_column = group_column
    def load(self):
        start_timestamp = 1546286400
        start_price = 50.00
        min_deltas = (-0.01, 0.00, 0.01)
        mean_volume = 5000
        if self.group_column:
            tickers = ['AAPL', 'MSFT', 'AMZN', 'GOOG', 'FB']
        else:
            tickers = ['AAPL']
        close_price = start_price
        volume = mean_volume
        res = {
            'OPEN': [],
            'HIGH': [],
            'LOW': [],
            'CLOSE': [],
            'VOLUME': [],
            'Time': [],
        }
        if self.group_column:
            res[self.group_column] = []
        random.seed(self.random_state)
        np.random.seed(self.random_state)
        for ticker in tickers:
            for i in range(self.bars):
                date = datetime.fromtimestamp(start_timestamp + i * self.bucket)
                open_price = close_price + random.choice(min_deltas)
                sign_price = 1 if random.random() >= open_price / (start_price * 2) else -1
                candle_body_change = abs(self.candle_body_baseval * normal(0, 1))
                candle_up_shadow_change = abs(self.candle_shadow_baseval * normal(0, 1))
                candle_down_shadow_change = abs(self.candle_shadow_baseval * normal(0, 1))
                close_price = open_price * (1 + sign_price * candle_body_change)
                high_price = max(open_price, close_price) * (1 + candle_up_shadow_change)
                low_price = min(open_price, close_price) * (1 - candle_down_shadow_change)
                hl_change = high_price / low_price - 1
                hl_mean_change = self.candle_body_baseval / self.candle_shadow_baseval / 100
                hl_relative_cur = hl_change / hl_mean_change
                vol_relative_prev = volume / (mean_volume * 2)
                if hl_change > hl_mean_change:
                    sign_volume = 1 if random.random() * hl_relative_cur >= vol_relative_prev else -0.5
                else:
                    sign_volume = 0.5 if random.random() * hl_relative_cur >= vol_relative_prev else -1
                # todo: use only -1 and 1 for sigh_volume, but make volume increase in proportion to hl_relative_cur and
                # volume decrease in proportion to 1/hl_relative_cur
                volume = volume * (1 + 9 * sign_volume * hl_change)
                res['OPEN'].append(round(open_price, 2))
                res['HIGH'].append(round(high_price, 2))
                res['LOW'].append(round(low_price, 2))
                res['CLOSE'].append(round(close_price, 2))
                res['VOLUME'].append(round(volume))
                res['Time'].append(str(date))
                if self.group_column:
                    res[self.group_column].append(ticker)
        df = pd.DataFrame(res).set_index('Time')
        self.schema.context = "pandas"
        return df 
[docs]class CSVDatafeed(BaseDatafeed):
    """
    Datafeed class for loading data from CSV.
    Parameters
    ----------
    params : dict
        Arguments goes directly to `pd.read_csv()` function.
    suffix : str
        Add a suffix to all columns of the result dataframe.
        Default: None
    """
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.suffix = kwargs.pop('suffix', None)
        self.params = dict(kwargs)
    def load(self):
        df = pd.read_csv(**self.params) if self.params else None
        if self.suffix:
            df = df.add_suffix(str(self.suffix))
        self.schema.context = "pandas"
        return df 
[docs]class OneTickBarsDatafeed(BaseDatafeed):
    """
    OneTick datafeed with bars (Open, High, Low, Close, Volume, Trade Count).
    Parameters
    ----------
    db : str
        Name for database to use.
        Default: 'NYSE_TAQ_BARS'.
    tick_type: str
        Tick type to load.
        Default: 'TRD_1M'.
    symbols: List[str]
        List of symbols to load.
        Default: ['AAPL'].
    start: otp.datetime
        Start datetime.
        Default: `datetime(2022, 3, 1, 9, 30)`
    end: otp.datetime
        End datetime.
        Default: `datetime(2022, 3, 10, 16, 0)`
    bucket: int
        Bucket size used to aggregate data (timeframe).
        Default: 600.
    bucket_time: str
        Bucket time to use: `start` or `end`.
        Default: `start`.
    timezone: str
        Timezone to use.
        Default: 'EST5EDT'.
    suffix : str
        Add a suffix to all columns of the result dataframe.
        Default: None.
    apply_times_daily: bool
        Apply times daily to the data, skipping data outside of the specified times for all days.
        Default: True.
    """
    def __init__(self, **kwargs):
        defaults = dict(db='NYSE_TAQ_BARS',
                        tick_type='TRD_1M',
                        symbols=['AAPL'],
                        start=otp.dt(2022, 3, 1, 9, 30),
                        end=otp.dt(2022, 3, 10, 16, 0),
                        bucket=600,
                        bucket_time="start",
                        timezone='EST5EDT',
                        apply_times_daily=True)
        defaults.update(kwargs)
        super().__init__(**defaults)
        self.db = defaults.get('db')
        self.tick_type = defaults.get('tick_type')
        self.symbols = defaults.get('symbols')
        self.start = defaults.get('start')
        self.end = defaults.get('end')
        self.bucket = defaults.get('bucket')
        self.timezone = defaults.get('timezone')
        self.bucket_time = defaults.get('bucket_time')
        self.suffix = defaults.get('suffix', None)
        self.apply_times_daily = defaults.get('apply_times_daily')
[docs]    def load(self, *args):
        """
        Main method used to load data.
        Returns
        ----------
        result: pd.DataFrame
           Loaded data
        """
        data = otp.DataSource(db=self.db,
                              tick_type=self.tick_type)
        data["VOLUME"] = data["VOLUME"].apply(float)
        data, _ = data[data['TRADE_TICK_COUNT'] > 0]
        data = data.agg({'OPEN': otp.agg.first(data['FIRST']),
                         'HIGH': otp.agg.max(data['HIGH']),
                         'LOW': otp.agg.min(data['LOW']),
                         'CLOSE': otp.agg.last(data['LAST']),
                         'VOLUME': otp.agg.sum(data['VOLUME']),
                         'TRADE_COUNT': otp.agg.sum(data['TRADE_TICK_COUNT'])},
                        bucket_time=self.bucket_time,
                        bucket_interval=self.bucket)
        data = otp.functions.corp_actions(data,
                                          adjustment_date=int(self.end.strftime('%Y%m%d')),
                                          adjustment_date_tz="GMT",
                                          adjust_rule='SIZE',
                                          fields='VOLUME')
        data = otp.functions.corp_actions(data,
                                          adjustment_date=int(self.end.strftime('%Y%m%d')),
                                          adjustment_date_tz="GMT",
                                          adjust_rule='PRICE',
                                          fields='OPEN,HIGH,LOW,CLOSE')
        # filter out data outside of the specified times
        if self.apply_times_daily:
            data = data.time_filter(start_time=self.start.strftime('%H%M%S%f')[:-3],
                                    end_time=self.end.strftime('%H%M%S%f')[:-3],
                                    timezone=self.timezone)
        merged_data = otp.merge([data], symbols=self.symbols, identify_input_ts=True)
        merged_data, _ = merged_data[merged_data['TRADE_COUNT'] > 0]
        merged_data = merged_data[['OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOLUME', 'TRADE_COUNT', 'SYMBOL_NAME']]
        df = otp.run(merged_data,
                     # apply_times_daily=self.apply_times_daily,
                     start=self.start,
                     # the minute bar for 9:30-9:31 has the timestamp of 9:31
                     end=self.end,
                     symbol_date=self.end,
                     # the minute bar for 3:59-4:00 has the timestamp of 4:00 but the end timestamp is not included
                     timezone=self.timezone)
        df = df.set_index('Time')
        if self.suffix:
            df = df.add_suffix(str(self.suffix))
        self.schema.context = "pandas"
        return df  
[docs]class SelectFeatures(BasePipelineOperator):
    """
    Class that selects the specified columns as features.
    Parameters
    ----------
    columns : list
        List of columns that are defined as features.
    override : bool
        If True, override the existing list of features columns.
        If false, adds the columns to the existing list of features columns.
    """
    def __init__(self, columns=None, override=False):
        if columns is None:
            raise ValueError("columns must be specified for using SelectFeatures")
        super().__init__(columns=columns, override=override)
        self.override = override
    def transform_pandas(self, df: pd.DataFrame):
        for col in self.column_names(df):
            if col not in df.columns:
                raise ValueError(f"Column {col} is not in the dataframe")
        if self.override:
            self.schema.features_columns = self.column_names(df)
        else:
            self.schema.add_features(self.column_names(df))
        return df
    def transform_ot(self, src: otp.Source):
        for col in self.column_names(src):
            if col not in src.schema:
                raise ValueError(f"Column {col} is not in the otp.Source schema")
        if self.override:
            self.schema.features_columns = self.column_names(src)
        else:
            self.schema.add_features(self.column_names(src))
        return src 
[docs]class SelectTargets(BasePipelineOperator):
    """
    Class that selects the specified columns as targets.
    Parameters
    ----------
    columns : list
        List of columns that are defined as targets.
    override : bool
        If True, override the existing list of targets columns.
        If False, adds the columns to the existing list of targets columns.
    shift : bool
        DONT USE IT YET!
        If True, shift the resulted target columns by one row,
        providing the **next** value of the target column for each row.
        It is useful for the case when we want to predict the next value of a time series,
        and we want to use the current value as a feature.
        Default is False.
    """
    def __init__(self, columns, override: bool = False, shift: bool = False):
        super().__init__(columns=columns,
                         override=override,
                         shift=shift)
        self.override = override
        self.shift = shift
    def add_targets(self, src):
        columns_list = self.column_names(src)
        for col in columns_list:
            if col in self.schema.features_columns and not self.shift:
                raise ValueError(f"Column {col} is already defined as a feature, "
                                 "it is prohibited to define it as a target in order to avoid data leakage. "
                                 "Set shift=True to shift the target column by one row.")
            if f"__UNPROCESSED.{col}" not in self.schema.unprocessed_target_columns:
                self.schema.unprocessed_target_columns += [f"__UNPROCESSED.{col}"]
        if self.override:
            self.schema.target_columns = columns_list
        else:
            self.schema.add_targets(columns_list)
    def transform_pandas(self, df: pd.DataFrame):
        for col in self.column_names(df):
            if col not in df.columns:
                raise ValueError(f"Column {col} is not in the dataframe")
            if self.shift:
                df[f"__UNPROCESSED.{col}"] = df[col].shift(-1)
                # drop the last row, as it contains NaN in target column
                # and also it contains data leakage from the future
                df.drop(df.index.max(), inplace=True)
            else:
                df[f"__UNPROCESSED.{col}"] = df[col]
        self.add_targets(df)
        return df
    def transform_ot(self, src: otp.Source):
        for col in self.column_names(src):
            if col not in src.schema:
                raise ValueError(f"Column {col} is not in the otp.Source schema")
            target_name = f"__UNPROCESSED.{col}"
            if self.shift:
                if target_name in src.schema:
                    src.drop(target_name, inplace=True)
                src[target_name] = src[col][1]
                agg = src.agg({"__TOTAL": otp.agg.count()})
                src = otp.join(src, agg, on="all")
                # drop the last row, as it contains NaN in target column
                src.state_vars["__COUNT"] = 0
                src.state_vars["__COUNT"] = src.state_vars["__COUNT"] + 1
                src, _ = src[src.state_vars["__COUNT"] < src["__TOTAL"]]
                src.drop("__TOTAL", inplace=True)
            else:
                src[target_name] = src[col]
        self.add_targets(src)
        return src 
[docs]class CalcLags(BaseFeatures):
    """
    Class that calculates lag values for selected columns.
    Note: this feature processing will remove the first `max(periods)` rows from the dataframe,
    as there is no lagged value for them.
    Parameters
    ----------
    columns : list
        List of columns for which lags are calculated.
    periods : list, optional
        Values of the lag to be used.
        Maximum value of `periods` is used to remove the first rows from the dataframe.
        Default: [1]
    suffix: str, optional
        The names of columns with calculated lag values have a suffix of the form: `f"{suffix}{value_of_lag}"`
        Default: "_LAG_"
    remove_first_rows: bool, optional
        If True, remove the first `max(periods)` rows from the dataframe.
        Default: False
    """
    def __init__(self,
                 columns: Optional[list],
                 periods: Optional[list] = None,
                 remove_first_rows: bool = False,
                 group_by_column: bool = False,
                 suffix: str = '_LAG_'):
        self.periods = [1] if periods is None else periods
        super().__init__(columns=columns,
                         periods=self.periods,
                         remove_first_rows=remove_first_rows,
                         suffix=suffix,
                         group_by_column=group_by_column)
        self.suffix = suffix
        self.remove_first_rows = remove_first_rows
        self.group_by_column = group_by_column
    def transform_pandas(self, df: pd.DataFrame):
        calculated_columns = []
        group_column_name = self.schema.group_column_name
        if group_column_name:
            if group_column_name not in df.columns:
                raise ValueError(f"column with groups `{group_column_name}` not in the input DataFrame")
            grouped_df = df.groupby(group_column_name)
        for column in self.column_names(df):
            if column not in df.columns:
                raise ValueError(f"column `{column}` not in the input DataFrame")
            for lag in self.periods:
                feature_col_name = f'{column}{self.suffix}{lag}'
                if group_column_name:
                    df[feature_col_name] = grouped_df[column].shift(lag)
                else:
                    df[feature_col_name] = df[column].shift(lag)
                calculated_columns.append(feature_col_name)
        if self.remove_first_rows:
            # remove first periods rows, as there is no lagged value for them
            if self.group_by_column:
                # remove the first `max(periods)` rows from each group
                df.drop(grouped_df.head(max(self.periods)).index, inplace=True)
            else:
                # remove the first `max(periods)` rows
                df.drop(df[:max(self.periods)].index, inplace=True)
        return df 
[docs]class PercentageSplitter(BaseSplitting):
    """
    Class for splitting data to X (features) and Y (target) sets, as well as to train-test-validate subsets
    (samples are determined using percentage size for validation and test subsets).
    Parameters
    ----------
    val_size: float
        The size of the validation subset.
        Default: 0
    test_size : float
        The size of the test subset.
        Default: 0.15
    shuffle : bool
        Whether or not to shuffle the data before splitting.
        Default: False
    """
    def __init__(self, val_size=0, test_size=0.15, shuffle=False):
        super().__init__(shuffle=shuffle, val_size=val_size, test_size=test_size)
        self.shuffle = shuffle
        self.test_size = test_size
        self.val_size = val_size
    def transform_pandas(self, df: pd.DataFrame):
        if self.test_size > 0:
            train_val, test = train_test_split(df.index, test_size=self.test_size, shuffle=self.shuffle)
        else:
            train_val = df.index
            test = []
        if self.val_size > 0:
            train, val = train_test_split(train_val, test_size=self.val_size, shuffle=self.shuffle)
        else:
            val = []
        df["__SPLIT"] = "TRAIN"
        df.loc[test, "__SPLIT"] = "TEST"
        df.loc[val, "__SPLIT"] = "VAL"
        return df 
[docs]class IndexSplitter(BaseSplitting):
    """
    Class for splitting data into X (features) and Y (target) sets, as well as into train-test-validate subsets
    (samples are determined using indexes for validation and test subsets).
    Parameters
    ----------
    val_indexes: list
        The indexes of the validation subset.
        Default: []
    test_indexes : list
        The indexes of the test subset.
        Default: []
    """
    def __init__(self, val_indexes=None, test_indexes=None):
        super().__init__(val_indexes=val_indexes, test_indexes=test_indexes)
        self.test_indexes = [] if test_indexes is None else test_indexes
        self.val_indexes = [] if val_indexes is None else val_indexes
    def transform_pandas(self, df: pd.DataFrame):
        val_test_indexes = self.val_indexes + self.test_indexes
        train_indexes = df[~df.index.isin(val_test_indexes)].index
        # convert list to set, because search in the set is O(1) which is beneficial when working with big data
        val_set = frozenset(self.val_indexes)
        val_indexes = [x for x in list(df.index) if x in val_set]
        test_set = frozenset(self.test_indexes)
        test_indexes = [x for x in list(df.index) if x in test_set]
        df["__SPLIT"] = ""
        df.loc[train_indexes, "__SPLIT"] = "TRAIN"
        df.loc[test_indexes, "__SPLIT"] = "TEST"
        df.loc[val_indexes, "__SPLIT"] = "VAL"
        return df 
[docs]class TimeSplitter(BaseSplitting):
    """
    Class for splitting data into X (features) and Y (target) sets, as well as into train-test-validate subsets
    (samples are determined using time ranges for validation and test subsets).
    Parameters
    ----------
    datetime_column: str
        Name of the column that stores the datetime values. Set '' for selecting index column.
        Default: 'Time'
    val_time_range: tuple
        Tuple of the start and end datetimes for the validation subset.
        Start time, end time are included in the range.
        Default: ()
    test_time_range : tuple
        Tuple of the start and end datetimes for the test subset.
        Start time, end time are included in the range.
        Default: ()
    """
    def __init__(self, datetime_column='Time', val_time_range=(), test_time_range=()):
        super().__init__(datetime_column=datetime_column,
                         val_time_range=val_time_range, test_time_range=test_time_range)
        self.datetime_column = datetime_column
        self.val_time_range = val_time_range
        self.test_time_range = test_time_range
    def transform_ot(self, src: otp.Source) -> otp.Source:
        src['__SPLIT'] = src["Time"].apply(
            lambda x:
                "TEST" if x >= self.test_time_range[0] and x < self.test_time_range[1] else
                "VAL" if x >= self.val_time_range[0] and x < self.val_time_range[1] else
                "TRAIN"
        )
        return src
    def transform_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
        # define datetime_series again so as not to save the series to memory as instance field,
        # and also to be able to independently call the split function
        if self.datetime_column:
            assert self.datetime_column in df.columns, f"column {self.datetime_column} not in input DataFrame"
            datetime_series = df[self.datetime_column]
        else:
            datetime_series = df.index
        if self.val_time_range and self.val_time_range[0] and self.val_time_range[1]:
            val_indexes = df.loc[((datetime_series >= self.val_time_range[0]) &
                                  (datetime_series < self.val_time_range[1]))].index
        else:
            val_indexes = []
        if self.test_time_range and self.test_time_range[0] and self.test_time_range[1]:
            test_indexes = df.loc[((datetime_series >= self.test_time_range[0]) &
                                   (datetime_series < self.test_time_range[1]))].index
        else:
            test_indexes = []
        val_test_indexes = list(val_indexes) + list(test_indexes)
        train_indexes = df[~df.index.isin(val_test_indexes)].index
        # convert list to set, because search in the set is O(1) which is beneficial when working with big data
        val_set = frozenset(val_indexes)
        val_indexes = [x for x in list(df.index) if x in val_set]
        test_set = frozenset(test_indexes)
        test_indexes = [x for x in list(df.index) if x in test_set]
        df["__SPLIT"] = ""
        df.loc[train_indexes, "__SPLIT"] = "TRAIN"
        df.loc[test_indexes, "__SPLIT"] = "TEST"
        df.loc[val_indexes, "__SPLIT"] = "VAL"
        return df 
[docs]class FilterValues(BaseFilter):
    def __init__(self,
                 columns: Union[list, str] = "__all__",
                 exclude_values: Union[list, str] = 'na',
                 exclude_from_test_set_only: bool = False):
        super().__init__(columns=columns,
                         exclude_values=exclude_values,
                         exclude_from_test_set_only=exclude_from_test_set_only,)
        self.exclude_values = exclude_values
        self.exclude_from_test_set_only = exclude_from_test_set_only
        if isinstance(self.exclude_values, str):
            self.exclude_values = [self.exclude_values]
    def transform_ot(self, src: otp.Source):
        for col in self.column_names(src):
            for value in self.exclude_values:
                if value == 'zeros':
                    src, _ = src[src[col] != 0]
                elif value == 'nan' or value == 'na':
                    src, _ = src[src[col] != otp.nan]
                elif value == 'negatives':
                    src, _ = src[src[col] >= 0]
        return src
    def transform_pandas(self, df: pd.DataFrame):
        operator_map = {'zeros': lambda c: c == 0,
                        'negatives': lambda c: c < 0,
                        'nan': lambda c: c.isna(),
                        'na': lambda c: c.isna(),
                        }
        for exclude_value in self.exclude_values:
            _operator = operator_map[exclude_value]
            for column in self.column_names(df):
                if self.exclude_from_test_set_only:
                    # TODO hardcoded column name __SPLIT, should be a schema based value
                    # TODO hardcoded value TEST, should be a constant
                    drop_indexes = _operator(df[df["__SPLIT"] == "TEST"][column])
                else:
                    drop_indexes = _operator(df[column])
                df.drop(drop_indexes[drop_indexes].index, inplace=True)
        return df