Parallelization#

Here we discuss how to parallelize computation and configure a cluster of the size and instances that you need.

import os
import onetick.py as otp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yaml
from onetick.ml.utils.func import build_experiment, build_experiment_class
from onetick.ml.impl.data_pipelines import OneTickBarsDatafeed, CalcLags, SelectColumns, SplitXYTrainTest 
from onetick.ml.impl.evaluators import MAEEvaluator, MAPEEvaluator, R2Evaluator, SMAPEEvaluator, RMSEEvaluator
from onetick.ml.impl.experiments import Experiment
from onetick.ml.impl.models import LGBMRegressor
start = otp.dt(2020, 5, 10, 9, 30)
end = otp.dt(2022, 11, 10, 16, 0)


class VolumePrediction(Experiment):
    # DATA
    datafeeds = [
        OneTickBarsDatafeed(
            db="NYSE_TAQ_BARS",
            tick_type="TRD_1M",
            symbols=["SPY"],
            start=start,
            end=end,
            bucket=600,
        )
    ]
    features = [CalcLags(periods=[1, 2, 3, 39, 40], columns=["VOLUME"])]

    splitters = [
        SplitXYTrainTest(columns_to_predict=["VOLUME"], test_size=0.15, val_size=0.15)
    ]

    preprocessors = []

    # MODEL
    models = [
        LGBMRegressor(
                init_params={'n_estimators': [50, 100, 250, 500],
                     'num_leaves': [5, 15, 30, 50],              
                     'max_depth': [2, 4, 6, 10],                    
                     'boosting_type': ['dart', 'gbdt'],
                     'random_state': [0]}
        )
    ]

    train_params = {"verbose": 0}
    

    # EVALUATION
    evaluators = [MAPEEvaluator(), SMAPEEvaluator(), MAEEvaluator(), RMSEEvaluator(), R2Evaluator()]

Local Run#

%%time
exp = VolumePrediction()

exp.run()
CPU times: user 6min 18s, sys: 2.87 s, total: 6min 21s
Wall time: 7min





({'VOLUME_MAPE': 0.23639447242974013,
  'VOLUME_SMAPE': 0.22595241388062917,
  'VOLUME_MAE': 328278.3309095651,
  'VOLUME_RMSE': 576434.1904692915,
  'VOLUME_R2': 0.7418314078167656},
              VOLUME
 20951  1.143500e+06
 20952  9.688475e+05
 20953  8.713387e+05
 20954  8.777409e+05
 20955  1.049195e+06
 ...             ...
 24637  1.519634e+06
 24638  1.762656e+06
 24639  1.887571e+06
 24640  2.751027e+06
 24641  7.911767e+06
 
 [3691 rows x 1 columns])
exp.models_cv_results
model mean_fit_time std_fit_time mean_score_time std_score_time params split0_test_score mean_test_score std_test_score
0 LGBMRegressor_id0 0.238921 0.0 0.003707 0.0 {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 5, 'random_state': 0} -509417.754195 -509417.754195 0.0
1 LGBMRegressor_id0 0.292608 0.0 0.003653 0.0 {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 15, 'random_state': 0} -509417.754195 -509417.754195 0.0
2 LGBMRegressor_id0 0.296359 0.0 0.003800 0.0 {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 30, 'random_state': 0} -509417.754195 -509417.754195 0.0
3 LGBMRegressor_id0 0.299425 0.0 0.003932 0.0 {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 50, 'random_state': 0} -509417.754195 -509417.754195 0.0
4 LGBMRegressor_id0 0.507102 0.0 0.081390 0.0 {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 100, 'num_leaves': 5, 'random_state': 0} -433633.687547 -433633.687547 0.0
... ... ... ... ... ... ... ... ... ...
123 LGBMRegressor_id0 1.799885 0.0 0.094058 0.0 {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 250, 'num_leaves': 50, 'random_state': 0} -413942.571220 -413942.571220 0.0
124 LGBMRegressor_id0 1.007128 0.0 0.015213 0.0 {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 5, 'random_state': 0} -401207.820850 -401207.820850 0.0
125 LGBMRegressor_id0 1.782854 0.0 0.094807 0.0 {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 15, 'random_state': 0} -409656.367605 -409656.367605 0.0
126 LGBMRegressor_id0 2.407380 0.0 0.098795 0.0 {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 30, 'random_state': 0} -418384.615268 -418384.615268 0.0
127 LGBMRegressor_id0 3.298256 0.0 0.107056 0.0 {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 50, 'random_state': 0} -426441.927821 -426441.927821 0.0

128 rows × 9 columns

2023-02-20 14:10:09,291	WARNING dataclient.py:396 -- Encountered connection issues in the data channel. Attempting to reconnect.
2023-02-20 14:10:39,495	WARNING dataclient.py:403 -- Failed to reconnect the data channel

Cluster Run#

%%capture
import os
import ray
os.environ['RAY_ADDRESS'] = "ray://172.16.104.130:10001"
ray.shutdown()
ray.init()
# http://172.16.1.89:8265
2023-02-20 20:38:53,544	INFO worker.py:1230 -- Using address ray://172.16.104.130:10001 set in the environment variable RAY_ADDRESS
%%time
@ray.remote(max_retries=1)
def remote_run():
    otp.config['tz'] = 'EST5EDT'
    exp = VolumePrediction()
    exp.run()
    return exp

exp_remote = ray.get(remote_run.remote())
CPU times: user 221 ms, sys: 55.8 ms, total: 277 ms
Wall time: 1min 11s
#confirm the results are the same
exp.models_cv_results[['params','mean_test_score']].compare(exp_remote.models_cv_results[['params','mean_test_score']])

Data Parаllelism: for cases when the training data does not fit in memory#

An example coming soon.