Parallelization
Contents
Parallelization#
Here we discuss how to parallelize computation and configure a cluster of the size and instances that you need.
import os
import onetick.py as otp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yaml
from onetick.ml.utils.func import build_experiment, build_experiment_class
from onetick.ml.impl.data_pipelines import OneTickBarsDatafeed, CalcLags, SelectColumns, SplitXYTrainTest
from onetick.ml.impl.evaluators import MAEEvaluator, MAPEEvaluator, R2Evaluator, SMAPEEvaluator, RMSEEvaluator
from onetick.ml.impl.experiments import Experiment
from onetick.ml.impl.models import LGBMRegressor
start = otp.dt(2020, 5, 10, 9, 30)
end = otp.dt(2022, 11, 10, 16, 0)
class VolumePrediction(Experiment):
# DATA
datafeeds = [
OneTickBarsDatafeed(
db="NYSE_TAQ_BARS",
tick_type="TRD_1M",
symbols=["SPY"],
start=start,
end=end,
bucket=600,
)
]
features = [CalcLags(periods=[1, 2, 3, 39, 40], columns=["VOLUME"])]
splitters = [
SplitXYTrainTest(columns_to_predict=["VOLUME"], test_size=0.15, val_size=0.15)
]
preprocessors = []
# MODEL
models = [
LGBMRegressor(
init_params={'n_estimators': [50, 100, 250, 500],
'num_leaves': [5, 15, 30, 50],
'max_depth': [2, 4, 6, 10],
'boosting_type': ['dart', 'gbdt'],
'random_state': [0]}
)
]
train_params = {"verbose": 0}
# EVALUATION
evaluators = [MAPEEvaluator(), SMAPEEvaluator(), MAEEvaluator(), RMSEEvaluator(), R2Evaluator()]
Local Run#
%%time
exp = VolumePrediction()
exp.run()
CPU times: user 6min 18s, sys: 2.87 s, total: 6min 21s
Wall time: 7min
({'VOLUME_MAPE': 0.23639447242974013,
'VOLUME_SMAPE': 0.22595241388062917,
'VOLUME_MAE': 328278.3309095651,
'VOLUME_RMSE': 576434.1904692915,
'VOLUME_R2': 0.7418314078167656},
VOLUME
20951 1.143500e+06
20952 9.688475e+05
20953 8.713387e+05
20954 8.777409e+05
20955 1.049195e+06
... ...
24637 1.519634e+06
24638 1.762656e+06
24639 1.887571e+06
24640 2.751027e+06
24641 7.911767e+06
[3691 rows x 1 columns])
exp.models_cv_results
model | mean_fit_time | std_fit_time | mean_score_time | std_score_time | params | split0_test_score | mean_test_score | std_test_score | |
---|---|---|---|---|---|---|---|---|---|
0 | LGBMRegressor_id0 | 0.238921 | 0.0 | 0.003707 | 0.0 | {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 5, 'random_state': 0} | -509417.754195 | -509417.754195 | 0.0 |
1 | LGBMRegressor_id0 | 0.292608 | 0.0 | 0.003653 | 0.0 | {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 15, 'random_state': 0} | -509417.754195 | -509417.754195 | 0.0 |
2 | LGBMRegressor_id0 | 0.296359 | 0.0 | 0.003800 | 0.0 | {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 30, 'random_state': 0} | -509417.754195 | -509417.754195 | 0.0 |
3 | LGBMRegressor_id0 | 0.299425 | 0.0 | 0.003932 | 0.0 | {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 50, 'num_leaves': 50, 'random_state': 0} | -509417.754195 | -509417.754195 | 0.0 |
4 | LGBMRegressor_id0 | 0.507102 | 0.0 | 0.081390 | 0.0 | {'boosting_type': 'dart', 'max_depth': 2, 'n_estimators': 100, 'num_leaves': 5, 'random_state': 0} | -433633.687547 | -433633.687547 | 0.0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
123 | LGBMRegressor_id0 | 1.799885 | 0.0 | 0.094058 | 0.0 | {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 250, 'num_leaves': 50, 'random_state': 0} | -413942.571220 | -413942.571220 | 0.0 |
124 | LGBMRegressor_id0 | 1.007128 | 0.0 | 0.015213 | 0.0 | {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 5, 'random_state': 0} | -401207.820850 | -401207.820850 | 0.0 |
125 | LGBMRegressor_id0 | 1.782854 | 0.0 | 0.094807 | 0.0 | {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 15, 'random_state': 0} | -409656.367605 | -409656.367605 | 0.0 |
126 | LGBMRegressor_id0 | 2.407380 | 0.0 | 0.098795 | 0.0 | {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 30, 'random_state': 0} | -418384.615268 | -418384.615268 | 0.0 |
127 | LGBMRegressor_id0 | 3.298256 | 0.0 | 0.107056 | 0.0 | {'boosting_type': 'gbdt', 'max_depth': 10, 'n_estimators': 500, 'num_leaves': 50, 'random_state': 0} | -426441.927821 | -426441.927821 | 0.0 |
128 rows × 9 columns
2023-02-20 14:10:09,291 WARNING dataclient.py:396 -- Encountered connection issues in the data channel. Attempting to reconnect.
2023-02-20 14:10:39,495 WARNING dataclient.py:403 -- Failed to reconnect the data channel
Cluster Run#
%%capture
import os
import ray
os.environ['RAY_ADDRESS'] = "ray://172.16.104.130:10001"
ray.shutdown()
ray.init()
# http://172.16.1.89:8265
2023-02-20 20:38:53,544 INFO worker.py:1230 -- Using address ray://172.16.104.130:10001 set in the environment variable RAY_ADDRESS
%%time
@ray.remote(max_retries=1)
def remote_run():
otp.config['tz'] = 'EST5EDT'
exp = VolumePrediction()
exp.run()
return exp
exp_remote = ray.get(remote_run.remote())
CPU times: user 221 ms, sys: 55.8 ms, total: 277 ms
Wall time: 1min 11s
#confirm the results are the same
exp.models_cv_results[['params','mean_test_score']].compare(exp_remote.models_cv_results[['params','mean_test_score']])
Data Parаllelism: for cases when the training data does not fit in memory#
An example coming soon.