Extend Algorithms in OpenBox¶
This guide will teach you how to extend algorithms in OpenBox.
You can implement a new surrogate model, a new acquisition function or a new acquisition function maximizer for Bayesian Optimization Advisor, or implement an advisor with a totally new algorithm.
Extend Bayesian Optimization Advisor¶
Workflow of Bayesian Optimization Advisor¶
Let’s start with understanding the workflow of Bayesian optimization Advisor.
In each iteration, Advisor.get_suggestion()
is called to generate a new config.
Here are the main steps in get_suggestion()
:
def get_suggestion(self, ...):
self.surrogate_model.train(...)
self.acquisition_function.update(...)
challengers = self.acq_optimizer.maximize(...)
...
First, the surrogate model is trained with the latest data.
Second, the acquisition function is updated with the surrogate model and additional information.
Third, the acquisition function maximizer samples points to maximize the acquisition function.
Please refer to openbox/core/generic_advisor.py
for more details.
Implement a New Surrogate Model¶
To implement a new surrogate model, inherit the AbstractModel
class.
The methods _train()
and _predict()
should be implemented.
For _train()
, the surrogate model should be updated with the latest data.
For _predict()
, a new batch of data is given, and the method should return
predicted mean and variance of the batch.
Please refer to openbox/surrogate/base/base_model.py
for more details.
Here is an example that implements a surrogate using RandomForest in scikit-learn:
import typing
import numpy as np
from sklearn.ensemble import RandomForestRegressor
import threading
from joblib import Parallel, delayed
from sklearn.utils.fixes import _joblib_parallel_args
from sklearn.utils.validation import check_is_fitted
from sklearn.ensemble._base import _partition_estimators
from openbox.surrogate.base.base_model import AbstractModel
def _collect_prediction(predict, X, out, lock):
"""
This is a utility function for joblib's Parallel.
It can't go locally in ForestClassifier or ForestRegressor, because joblib
complains that it cannot pickle it when placed there.
"""
prediction = predict(X, check_input=False)
with lock:
out.append(prediction)
class skRandomForestWithInstances(AbstractModel):
def __init__(self, types: np.ndarray,
bounds: typing.List[typing.Tuple[float, float]],
num_trees: int=10,
do_bootstrapping: bool=True,
n_points_per_tree: int=-1,
ratio_features: float=5. / 6.,
min_samples_split: int=3,
min_samples_leaf: int=3,
max_depth: int=2**20,
eps_purity: float=1e-8,
max_num_nodes: int=2**20,
seed: int=42,
n_jobs: int=None,
**kwargs):
"""
Parameters
----------
types : np.ndarray (D)
Specifies the number of categorical values of an input dimension where
the i-th entry corresponds to the i-th input dimension. Let's say we
have 2 dimension where the first dimension consists of 3 different
categorical choices and the second dimension is continuous than we
have to pass np.array([2, 0]). Note that we count starting from 0.
bounds : list
Specifies the bounds for continuous features.
num_trees : int
The number of trees in the random forest.
do_bootstrapping : bool
Turns on / off bootstrapping in the random forest.
n_points_per_tree : int
Number of points per tree. If <= 0 X.shape[0] will be used
in _train(X, y) instead
ratio_features : float
The ratio of features that are considered for splitting.
min_samples_split : int
The minimum number of data points to perform a split.
min_samples_leaf : int
The minimum number of data points in a leaf.
max_depth : int
The maximum depth of a single tree.
eps_purity : float
The minimum difference between two target values to be considered
different
max_num_nodes : int
The maxmimum total number of nodes in a tree
seed : int
The seed that is passed to the random_forest_run library.
n_jobs : int, default=None
The number of jobs to run in parallel. :meth:`fit`, :meth:`predict`,
:meth:`decision_path` and :meth:`apply` are all parallelized over the
trees. ``None`` means 1 unless in a :obj:`joblib.parallel_backend`
context. ``-1`` means using all processors. See :term:`Glossary
<n_jobs>` for more details.
"""
super().__init__(types, bounds, **kwargs)
self.rng = np.random.RandomState(seed)
self.num_trees = num_trees
self.do_bootstrapping = do_bootstrapping
max_features = None if ratio_features > 1.0 else \
int(max(1, types.shape[0] * ratio_features))
self.max_features = max_features
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_depth = max_depth
self.epsilon_purity = eps_purity
self.max_num_nodes = max_num_nodes
self.n_points_per_tree = n_points_per_tree
self.n_jobs = n_jobs
self.rf = None # type: RandomForestRegressor
def _train(self, X: np.ndarray, y: np.ndarray):
"""Trains the random forest on X and y.
Parameters
----------
X : np.ndarray [n_samples, n_features]
Input data points.
Y : np.ndarray [n_samples, ]
The corresponding target values.
Returns
-------
self
"""
self.X = X
self.y = y.flatten()
if self.n_points_per_tree <= 0:
self.num_data_points_per_tree = self.X.shape[0]
else:
self.num_data_points_per_tree = self.n_points_per_tree
self.rf = RandomForestRegressor(
n_estimators=self.num_trees,
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf,
max_features=self.max_features,
max_samples=self.num_data_points_per_tree,
max_leaf_nodes=self.max_num_nodes,
min_impurity_decrease=self.epsilon_purity,
bootstrap=self.do_bootstrapping,
n_jobs=self.n_jobs,
random_state=self.rng,
)
self.rf.fit(self.X, self.y)
return self
def _predict(self, X: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray]:
"""Predict means and variances for given X.
Parameters
----------
X : np.ndarray of shape = [n_samples, n_features]
Returns
-------
means : np.ndarray of shape = [n_samples, 1]
Predictive mean
vars : np.ndarray of shape = [n_samples, 1]
Predictive variance
"""
if len(X.shape) != 2:
raise ValueError(
'Expected 2d array, got %dd array!' % len(X.shape))
if X.shape[1] != self.types.shape[0]:
raise ValueError('Rows in X should have %d entries but have %d!' % (self.types.shape[0], X.shape[1]))
check_is_fitted(self.rf)
# Check data
if X.ndim == 1:
X = X.reshape((1, -1))
X = self.rf._validate_X_predict(X)
# Assign chunk of trees to jobs
n_jobs, _, _ = _partition_estimators(self.rf.n_estimators, self.rf.n_jobs)
# collect the output of every estimator
all_y_preds = list()
# Parallel loop
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=self.rf.verbose,
**_joblib_parallel_args(require="sharedmem"))(
delayed(_collect_prediction)(e.predict, X, all_y_preds, lock)
for e in self.rf.estimators_)
all_y_preds = np.asarray(all_y_preds, dtype=np.float64)
means = np.mean(all_y_preds, axis=0)
vars_ = np.var(all_y_preds, axis=0)
return means.reshape((-1, 1)), vars_.reshape((-1, 1))
Implement a New Acquisition Function¶
To implement a new acquisition function, inherit the AbstractAcquisitionFunction
class.
The method _compute()
should be implemented, which calculates the acquisition function
values of a new batch of points.
Call self.model.predict_marginalized_over_instances(X)
to get predicted mean and variance
of the points from the surrogate model.
Please refer to openbox/acquisition_function/acquisition.py
for more details.
Here is an example of the probability of improvement (PI) acquisition function:
import numpy as np
from scipy.stats import norm
from openbox.acquisition_function import AbstractAcquisitionFunction
class PI(AbstractAcquisitionFunction):
def __init__(self,
model,
par: float = 0.0,
**kwargs):
r"""Computes the probability of improvement for a given x over the best so far value as
acquisition value.
:math:`P(f_{t+1}(\mathbf{X})\geq f(\mathbf{X^+})) :=
\Phi(\frac{\mu(\mathbf{X}) - f(\mathbf{X^+})}{\sigma(\mathbf{X})})`,
with :math:`f(X^+)` as the incumbent and :math:`\Phi` the cdf of the standard normal
Parameters
----------
model : AbstractEPM
A surrogate that implements at least
- predict_marginalized_over_instances(X)
par : float, default=0.0
Controls the balance between exploration and exploitation of the
acquisition function.
"""
super(PI, self).__init__(model)
self.long_name = 'Probability of Improvement'
self.par = par
self.eta = None
def _compute(self, X: np.ndarray, **kwargs):
"""Computes the PI value.
Parameters
----------
X: np.ndarray(N, D)
Points to evaluate PI. N is the number of points and D the dimension for the points
Returns
-------
np.ndarray(N, 1)
PI of X
"""
if self.eta is None:
raise ValueError('No current best specified. Call update('
'eta=<float>) to inform the acquisition function '
'about the current best value.')
if len(X.shape) == 1:
X = X[:, np.newaxis]
m, var_ = self.model.predict_marginalized_over_instances(X)
std = np.sqrt(var_)
return norm.cdf((self.eta - m - self.par) / std)
Implement a New Acquisition Function Maximizer¶
To implement a new acquisition function maximizer, inherit the AcquisitionFunctionMaximizer
class.
The method _maximize()
should be implemented, which returns an iterable of tuples,
consisting of the acquisition function value and the configuration.
Please refer to openbox/acq_optimizer/basic_maximizer.py
for more details.
Modify Advisor with Your Own Components¶
To use your own surrogate model / acquisition function / acquisition function maximizer,
simply replace the attribute in Advisor
as following:
from openbox import Advisor
advisor = Advisor(...)
advisor.surrogate_model = MySurrogateModel(...)
advisor.acquisition_function = MyAcquisitionFunction(...)
advisor.acq_optimizer = MyAcquisitionFunctionMaximizer(...)
This is the quickest way to use custom components.
To integrate new algorithms into Bayesian optimization Advisor
,
please modify the initialization process of Advisor
.
Implement a New Advisor¶
The advisor is designed to be a config generator.
In each iteration, advisor.get_suggestion()
is called to generate a new config.
After evaluation, the Observation
is updated into advisor by advisor.update_observation()
.
To implement a new advisor, inherit the BaseAdvisor
class.
In most cases, implement the get_suggestion()
method is sufficient.
This method should return a new config to be evaluated.
To do extra actions during updating history data, please modify update_observation()
in advisor.
To suggest multiple configs at a time in parallel settings, implement get_suggestions()
instead.
Here is an example of a naive random advisor without duplication check:
from openbox.core.base_advisor import BaseAdvisor
class RandomAdvisor(BaseAdvisor):
def get_suggestion(self):
config = self.config_space.sample_configuration()
return config
And here is an example multi-objective EHVI advisor implemented using BoTorch:
import warnings
from typing import List
import numpy as np
from ConfigSpace import UniformFloatHyperparameter, UniformIntegerHyperparameter, Configuration
import torch
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.model_list_gp_regression import ModelListGP
from botorch.models.transforms.outcome import Standardize
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood
from botorch.utils.transforms import unnormalize, normalize
from botorch.utils.sampling import draw_sobol_samples
from botorch.optim.optimize import optimize_acqf
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
FastNondominatedPartitioning,
)
from botorch.acquisition.multi_objective.monte_carlo import (
qExpectedHypervolumeImprovement,
)
from botorch import fit_gpytorch_mll
from botorch.exceptions import BadInitialCandidatesWarning
from botorch.sampling.normal import SobolQMCNormalSampler
from openbox import Observation, logger
from openbox.core.base_advisor import BaseAdvisor
from openbox.utils.config_space.space_utils import get_config_from_dict
class BoTorchEHVIAdvisor(BaseAdvisor):
"""
An Advisor using BoTorch's qEHVI acquisition function.
Caution: BoTorch maximizes the objectives.
"""
def __init__(
self,
config_space,
num_objectives: int,
ref_point,
init_num=-1,
NUM_RESTARTS=10,
RAW_SAMPLES=512,
MC_SAMPLES=128,
output_dir='logs',
task_id='BoTorchEHVI',
seed=None,
logger_kwargs: dict = None,
):
assert num_objectives > 1
for hp in config_space.get_hyperparameters():
assert isinstance(hp, (UniformFloatHyperparameter, UniformIntegerHyperparameter))
assert ref_point is not None
super().__init__(
config_space=config_space,
num_objectives=num_objectives,
num_constraints=0,
ref_point=ref_point,
output_dir=output_dir,
task_id=task_id,
random_state=seed,
logger_kwargs=logger_kwargs,
)
warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
# random seed
np.random.seed(seed)
torch.manual_seed(seed)
self.tkwargs = {
"dtype": torch.double,
"device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
}
self.NUM_RESTARTS = NUM_RESTARTS
self.RAW_SAMPLES = RAW_SAMPLES
self.MC_SAMPLES = MC_SAMPLES
self.problem_dim = len(config_space.get_hyperparameters())
self.standard_bounds = torch.zeros(2, self.problem_dim, **self.tkwargs)
self.standard_bounds[1] = 1
self.problem_bounds = self.get_problem_bounds()
# Caution: BoTorch maximizes the objectives
self.botorch_ref_point = -1.0 * torch.tensor(ref_point, **self.tkwargs)
# initial design
self.init_num = init_num if init_num > 0 else 2 * (self.problem_dim + 1)
logger.info(f'init_num: {self.init_num}')
self.init_configs = self.generate_initial_configs(self.init_num)
def get_suggestion(self):
n_history = len(self.history)
if n_history < self.init_num:
logger.info(f'Initial iter {n_history + 1}/{self.init_num}')
return self.init_configs[n_history]
X = self.history.get_config_array(transform='numerical')
Y = self.history.get_objectives(transform='failed')
train_x = torch.tensor(X, **self.tkwargs) # train_x is not normalized
train_obj = -1.0 * torch.tensor(Y, **self.tkwargs) # Caution: BoTorch maximizes the objectives
# reinitialize the models so they are ready for fitting on next iteration
# Note: we find improved performance from not warm starting the model hyperparameters
# using the hyperparameters from the previous iteration
mll, model = self.initialize_model(train_x, train_obj)
# fit the models
fit_gpytorch_mll(mll)
# define the acquisition modules using a QMC sampler
qehvi_sampler = SobolQMCNormalSampler(sample_shape=torch.Size([self.MC_SAMPLES]))
# optimize acquisition functions and get new candidate
new_x = self.optimize_qehvi(
model, train_x, train_obj, qehvi_sampler
)
config = self.tensor2configs(new_x)[0]
logger.info(f'Get suggestion. new_x: {new_x}, config: {config}')
return config
def update_observation(self, observation: Observation):
logger.info(f'Update observation: {observation}')
return super().update_observation(observation)
def generate_initial_configs(self, init_num):
x = draw_sobol_samples(bounds=self.problem_bounds, n=init_num, q=1).squeeze(1)
configs = self.tensor2configs(x)
return configs
def initialize_model(self, train_x, train_obj):
# define models for objective
train_x = normalize(train_x, self.problem_bounds)
models = []
for i in range(train_obj.shape[-1]):
train_y = train_obj[..., i: i + 1]
model = SingleTaskGP(train_x, train_y, outcome_transform=Standardize(m=1))
models.append(model)
model = ModelListGP(*models)
mll = SumMarginalLogLikelihood(model.likelihood, model)
return mll, model
def optimize_qehvi(self, model, train_x, train_obj, sampler):
"""Optimizes the qEHVI acquisition function, and returns a new candidate."""
# partition non-dominated space into disjoint rectangles
with torch.no_grad():
pred = model.posterior(normalize(train_x, self.problem_bounds)).mean
partitioning = FastNondominatedPartitioning(
ref_point=self.botorch_ref_point,
Y=pred,
)
acq_func = qExpectedHypervolumeImprovement(
model=model,
ref_point=self.botorch_ref_point,
partitioning=partitioning,
sampler=sampler,
)
# optimize
candidates, _ = optimize_acqf(
acq_function=acq_func,
bounds=self.standard_bounds,
q=1,
num_restarts=self.NUM_RESTARTS,
raw_samples=self.RAW_SAMPLES, # used for intialization heuristic
options={"batch_limit": 5, "maxiter": 200},
sequential=True,
)
# observe new values
new_x = unnormalize(candidates.detach(), bounds=self.problem_bounds)
return new_x
def get_problem_bounds(self):
bounds = []
for hp in self.config_space.get_hyperparameters():
bounds.append((hp.lower, hp.upper))
bounds = torch.tensor(bounds, **self.tkwargs).transpose(0, 1)
return bounds
def tensor2configs(self, x: torch.Tensor) -> List[Configuration]:
"""
Convert x (torch tensor) to a list of Configurations
x should be unnormalized.
"""
assert x.dim() == 2, f'Expected 2-d tensor, got {x.dim()}'
configs = []
for i in range(x.shape[0]):
config_dict = dict()
for j, hp in enumerate(self.config_space.get_hyperparameters()):
config_dict[hp.name] = x[i, j].item()
config = get_config_from_dict(self.config_space, config_dict)
configs.append(config)
return configs
if __name__ == '__main__':
import matplotlib.pyplot as plt
from openbox.benchmark.objective_functions.synthetic import BraninCurrin
problem = BraninCurrin()
advisor = BoTorchEHVIAdvisor(
config_space=problem.config_space,
num_objectives=problem.num_objectives,
ref_point=problem.ref_point,
init_num=-1,
output_dir='logs',
task_id='BoTorchEHVI',
seed=1234,
)
n_iter = 20
for i in range(n_iter):
logger.info(f'=== Iteration {i + 1}/{n_iter} ===')
config = advisor.get_suggestion()
result = problem(config)
observation = Observation(config=config, objectives=result['objectives'])
advisor.update_observation(observation)
history = advisor.get_history()
history.plot_hypervolumes(optimal_hypervolume=problem.max_hv, logy=True)
plt.show()