# -*- coding: utf-8 -*-
# pylint:disable=anomalous-backslash-in-string
# Copyright 2022 PyePAL authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for PAL"""
import logging
import warnings
from copy import deepcopy
from typing import List, Union
import numpy as np
from sklearn.metrics import mean_absolute_error
from .core import (
_get_max_wt,
_get_max_wt_all,
_get_uncertainty_regions,
_pareto_classify,
_uncertainty,
_union,
)
from .validate_inputs import (
base_validate_models,
validate_beta_scale,
validate_coef_var,
validate_delta,
validate_epsilon,
validate_goals,
validate_ndim,
validate_ranges,
)
PAL_LOGGER = logging.getLogger("PALLogger")
PAL_LOGGER.setLevel(level="INFO")
CONSOLE_HANDLER = logging.StreamHandler()
CONSOLE_FORMAT = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
CONSOLE_HANDLER.setFormatter(CONSOLE_FORMAT)
PAL_LOGGER.addHandler(CONSOLE_HANDLER)
__all__ = ["PALBase", "PAL_LOGGER"]
[docs]class PALBase: # pylint:disable=too-many-instance-attributes, too-many-public-methods
"""PAL base class"""
[docs] def __init__( # pylint:disable=too-many-arguments
self,
X_design: np.array,
models: list,
ndim: int,
epsilon: Union[list, float] = 0.01,
delta: float = 0.05,
beta_scale: float = 1 / 9,
goals: List[str] = None,
coef_var_threshold: float = 3,
ranges: Union[np.ndarray, None] = None,
):
r"""Initialize the PAL instance
Args:
X_design (np.array): Design space (feature matrix)
models (list): Machine learning models
ndim (int): Number of objectives
epsilon (Union[list, float], optional): Epsilon hyperparameter.
Defaults to 0.01.
delta (float, optional): Delta hyperparameter. Defaults to 0.05.
beta_scale (float, optional): Scaling parameter for beta.
If not equal to 1, the theoretical guarantees do not necessarily hold.
Also note that the parametrization depends on the kernel type.
Defaults to 1/9.
goals (List[str], optional): If a list, provide "min" for every objective
that shall be minimized and "max" for every objective
that shall be maximized. Defaults to None, which _means
that the code maximizes all objectives.
coef_var_threshold (float, optional): Use only points with
a coefficient of variation below this threshold
in the classification step. Defaults to 3.
ranges (np.ndarray, optional): Numpy array of length ndmin,
where each element contains the value range of given objective.
If this is provided, we will use :math:`\epsilon \cdot ranges`
to computer the uncertainties of the hyperrectangles instead
of the default behavior :math:`\epsilon \cdot |\mu|`
"""
self.cross_val_points = 10 # maybe we make it an argument at some point
self.ndim = validate_ndim(ndim)
self.epsilon = validate_epsilon(epsilon, self.ndim)
self.delta = validate_delta(delta)
self.beta_scale = validate_beta_scale(beta_scale)
self.pareto_optimal = np.array([False] * len(X_design))
self.discarded = np.array([False] * len(X_design))
self.sampled = np.array([[False] * self.ndim] * len(X_design))
self.unclassified = np.array([True] * len(X_design))
self.rectangle_ups: np.array = None
self.rectangle_lows: np.array = None
self.models = base_validate_models(models)
self.iteration = 1
design_space_size = len(X_design)
self.coef_var_threshold = validate_coef_var(coef_var_threshold)
self.coef_var_mask = np.array([True] * design_space_size)
# _means/std are the model predictions
self._means: np.array = None
self.std: np.array = None
self.design_space = X_design
self.beta = None
self.goals = validate_goals(goals, ndim)
self.ranges = validate_ranges(ranges, ndim)
# self.y is what needs to be used for train/predict
# as there the data has been turned into maximization
# self._y contains the data as provided by the user
self.y = np.zeros((design_space_size, self.ndim)) # pylint:disable=invalid-name
self._y = self.y
# measurement_uncertainty is provided in update_train_set by the user
self.measurement_uncertainty = np.zeros((design_space_size, self.ndim))
self._has_train_set = False
[docs] def __repr__(self):
return f"pyepal at iteration {self.iteration}. \
{self.number_pareto_optimal_points} Pareto optimal points, \
{self.number_discarded_points} discarded points, \
{self.number_unclassified_points} unclassified points."
def _uses_fixed_epsilon(self):
if self.ranges is not None:
return True
return False
@property
def uses_fixed_epsilon(self):
r"""True if it uses the fixed epsilon :math:`\epsilon \cdot ranges`"""
return self._uses_fixed_epsilon()
def _reset(self):
self.pareto_optimal = np.array([False] * self.number_design_points)
self.discarded = np.array([False] * self.number_design_points)
self.sampled = np.array([[False] * self.ndim] * self.number_design_points)
self.unclassified = np.array([True] * self.number_design_points)
self.rectangle_ups: np.array = None
self.rectangle_lows: np.array = None
self.iteration = 1
self.coef_var_mask = np.array([True] * self.number_design_points)
# _means/std are the model predictions
self._means: np.array = None
self.std: np.array = None
self.beta = None
# self.y is what needs to be used for train/predict
# as there the data has been turned into maximization
# self._y contains the data as provided by the user
self.y = np.zeros((self.number_design_points, self.ndim)) # pylint:disable=invalid-name
self._y = self.y
# measurement_uncertainty is provided in update_train_set by the user
self.measurement_uncertainty = np.zeros((self.number_design_points, self.ndim))
self._has_train_set = False
def _reset_classification(self):
"""Resetting the mask arrays that keep track of the classifications.
But do *not* reset the sampling status"""
self.pareto_optimal = np.array([False] * self.number_design_points)
self.discarded = np.array([False] * self.number_design_points)
self.unclassified = np.array([True] * self.number_design_points)
@property
def sampled_mask(self):
"""Create a mask for the sampled points
We count a point as sampled if at least one objective has
been measured, i.e., self.sampled is a N * number objectives
array in which some columns can be false if a measurement
has not been performed"""
return self.sampled.sum(axis=1) > 0
@property
def number_design_points(self):
"""Return the number of points in the design space"""
return len(self.design_space)
@property
def pareto_optimal_points(self):
"""Return the pareto optimal points"""
return self.design_space[self.pareto_optimal]
@property
def sampled_points(self):
"""Return the sampled points"""
return self.design_space[self.sampled_indices]
@property
def discarded_points(self):
"""Return the discarded points"""
return self.design_space[self.discarded]
@property
def unclassified_points(self):
"""Return the discarded points"""
return self.design_space[self.unclassified]
@property
def pareto_optimal_indices(self):
"""Return the indices of the Pareto optimal points"""
return np.where(self.pareto_optimal)[0]
@property
def sampled_indices(self):
"""Return the indices of the sampled points"""
return np.unique(np.where(self.sampled)[0])
@property
def discarded_indices(self):
"""Return the indices of the discarded points"""
return np.where(self.discarded)[0]
@property
def unclassified_indices(self):
"""Return the indices of the unclassified points"""
return np.where(self.unclassified)[0]
@property
def number_pareto_optimal_points(self):
"""Return the number of Pareto optimal points"""
return sum(self.pareto_optimal)
@property
def number_discarded_points(self):
"""Return the number of discarded points"""
return sum(self.discarded)
@property
def number_unclassified_points(self):
"""Return the number of unclassified points"""
return sum(self.unclassified)
@property
def number_sampled_points(self):
"""Return the number of sampled points"""
return len(self.sampled_indices)
@property
def hyperrectangle_sizes(self):
"""Return the sizes of the hyperrectangles"""
return _uncertainty(self.rectangle_ups, self.rectangle_lows, self._means)
@property
def means(self):
"""Return the means predicted by the model"""
if self._means is None:
raise ValueError(
"Predicted means is None. Execute run_one_step() \
to obtain predicted means for each model."
)
return self._means * self.goals
def _update_beta(self):
"""Update beta according to section 7.2. of the epsilon-PAL paper"""
self.beta = (
self.beta_scale
* 2
* np.log(
self.ndim
* self.number_design_points
* np.square(np.pi)
* np.square(self.iteration + 1)
/ (6 * self.delta)
)
)
def _log(self):
pass
def _should_optimize_hyperparameters(self) -> bool: # pylint:disable=no-self-use
return True
def _should_reclassify(self) -> bool: # pylint:disable=no-self-use
if self.number_unclassified_points <= 1:
return True
return False
def _predict(self):
raise NotImplementedError("The predict function is not implemented")
def _set_hyperparameters(self):
pass
def _turn_to_maximization(self):
self.y = self._y * self.goals
def _train(self):
pass
def _set_data(self):
pass
def _crossvalidate(self):
sampled_original = deepcopy(self.sampled)
sampled_idx_original = self.sampled_indices
errors = []
# this step is to make the code not to complicate
# and deal with both large and small sample sizes
# in small samples sizes, we want to do leave-on-out CV
# in large samples this is too expensive,
# hence we chose a random subset, for which we
# test the model
sample_subset = np.random.choice(
sampled_idx_original,
min([self.cross_val_points, len(sampled_idx_original)]),
replace=False,
)
for sampled_idx in sample_subset:
# make sure that we do not run into errors due to np.nan
if self.sampled[sampled_idx].sum() == self.ndim:
# copy here is important, otherwise all
# points we set to False remain False
self.sampled = sampled_original.copy()
self.sampled[sampled_idx, :] = False
self._set_data()
if self._should_optimize_hyperparameters():
self._set_hyperparameters()
self._train()
self._predict()
error = mean_absolute_error(self.y[sampled_idx], self._means[sampled_idx])
errors.append(error)
self.sampled = sampled_original
return np.array(errors).mean()
[docs] def should_cross_validate(self):
"""Override for more complex cross validation schedules"""
if (self.iteration == 1) & (self.cross_val_points > 0):
return True
return False
def _update_hyperrectangles(self, new_indices: np.ndarray = None):
"""Computes new hyperrectangles based on beta,
the _means and the standard deviations.
If the iteration is > 0,
then it uses iterative intersection to ensure that the size of the
hyperrectangles is decreasing.
Args:
new_indices (np.ndarray): If provided, it will not use the
iterative intersection algorithm for these hyperrectangles.
Instead, it will just use the scaled edges based on the
model's prediction. Defaults to None.
"""
lows, ups = _get_uncertainty_regions(self._means, self.std, np.sqrt(self.beta))
if self.iteration == 1:
# initialization
self.rectangle_lows, self.rectangle_ups = lows, ups
else:
if new_indices is None:
self.rectangle_lows, self.rectangle_ups = _union(
self.rectangle_lows, self.rectangle_ups, lows, ups
)
else:
not_new = np.array(
[i for i in range(self.number_design_points) if i not in new_indices]
)
self.rectangle_lows[new_indices] = lows[new_indices]
self.rectangle_ups[new_indices] = ups[new_indices]
self.rectangle_lows[not_new], self.rectangle_ups[not_new] = _union(
self.rectangle_lows[not_new],
self.rectangle_ups[not_new],
lows[not_new],
ups[not_new],
)
def _update_coef_var_mask(self):
"""Update the mask array of elements that have variance below
the coefficient of variation threshold"""
# small workaround to avoid potential bugs due to division by zero
# this will fail if everything is zero in this case,
# I feel we might just give up for now,
# in the future we can think fo just applying a shift
if self._means.sum() != 0:
_means_no_zero = self._means.copy()
_means_no_zero[_means_no_zero == 0] = np.median(_means_no_zero)
self.coef_var_mask = np.max(self.std / _means_no_zero, axis=1) < self.coef_var_threshold
else:
mean_variation = self.std.mean()
self.coef_var_mask = np.max(self.std / mean_variation, axis=1) < self.coef_var_threshold
def _classify(self):
self._update_coef_var_mask()
if self._should_reclassify: # pylint:disable=using-constant-test
PAL_LOGGER.info("Resetting the classifications.")
self._reset_classification()
if self.uses_fixed_epsilon:
pareto_optimal, discarded, unclassified = _pareto_classify(
self.pareto_optimal[self.coef_var_mask],
self.discarded[self.coef_var_mask],
self.unclassified[self.coef_var_mask],
self.rectangle_lows[self.coef_var_mask],
self.rectangle_ups[self.coef_var_mask],
self.epsilon * self.ranges,
is_fixed_epsilon=True,
)
else:
pareto_optimal, discarded, unclassified = _pareto_classify(
self.pareto_optimal[self.coef_var_mask],
self.discarded[self.coef_var_mask],
self.unclassified[self.coef_var_mask],
self.rectangle_lows[self.coef_var_mask],
self.rectangle_ups[self.coef_var_mask],
self.epsilon,
is_fixed_epsilon=False,
)
self.pareto_optimal[self.coef_var_mask] = pareto_optimal
self.discarded[self.coef_var_mask] = discarded
self.unclassified[self.coef_var_mask] = unclassified
def _replace_by_measurements(self, replace_mean: bool = True, replace_std: bool = True):
"""Implements one "trick". Instead of using the GPR
predictions for the sampled points we use the data that
was actually measured and the actual uncertainty."""
if replace_mean:
self._means[self.sampled] = self.y[self.sampled]
if replace_std:
self.std[self.sampled] = self.measurement_uncertainty[self.sampled]
[docs] def run_one_step( # pylint:disable=too-many-arguments
self,
batch_size: int = 1,
pooling_method: str = "fro",
sample_discarded: bool = False,
use_coef_var: bool = True,
replace_mean: bool = True,
replace_std: bool = True,
) -> Union[np.array, None]:
"""[summary]
Args:
batch_size (int, optional): Number of indices that will be returned.
Defaults to 1.
pooling_method (str): Method that is used to aggregate
the uncertainty in different objectives into one scalar.
Available options are: "fro" (Frobenius/Euclidean norm), "mean",
"median". Defaults to "fro".
sample_discarded (bool): if true, it will sample from all points
and not only from the unclassified and Pareto optimal ones
use_coef_var (bool): If True, uses the coefficient of variation instead of
the unscaled rectangle sizes
replace_mean (bool): If true uses the measured _means for the sampled points
replace_std (bool): If true uses the measured standard deviation for the
sampled points
Raises:
ValueError: In case the PAL instance was not initialized with
measurements.
Returns:
Union[np.array, None]: Returns array of indices if there are
unclassified points that can be sample left.
"""
if not self._has_train_set:
raise ValueError(
"You need to provide a set of points to train the model on.\
Before the first iteration, call the update_train_set() function."
)
self._set_data()
if self.should_cross_validate():
self._compare_mae_variance()
if self._should_optimize_hyperparameters():
self._set_hyperparameters()
self._train()
self._predict()
self._update_beta()
self._replace_by_measurements(replace_mean=replace_mean, replace_std=replace_std)
self._update_hyperrectangles()
self._classify()
samples = np.array([], dtype=np.int)
if sum((self.pareto_optimal | self.unclassified) & ~self.sampled_mask) == 0:
PAL_LOGGER.info("No point that we could sample left.")
return None
if sum(self.unclassified):
for _ in range(batch_size):
sampled_idx = self.sample(
exclude_idx=samples,
pooling_method=pooling_method,
sample_discarded=sample_discarded,
use_coef_var=use_coef_var,
)
samples = np.append(samples, [sampled_idx])
self._log()
self._log()
self.iteration += 1
return samples
PAL_LOGGER.info("Done. No unclassified point left.")
return None
def _compare_mae_variance(self):
mae = np.nan_to_num(self._crossvalidate(), nan=np.inf)
self._predict()
mean_std = self.std.mean()
if mae >= mean_std:
warnings.warn(
"""The mean absolute error in crossvalidation is {:.2f},
the mean standard deviation is {:.2f}.
Your model might not be predictive and/or overconfident.
In the docs, you find hints on how to make models more robust.""".format(
mae, mean_std
),
UserWarning,
)
else:
PAL_LOGGER.info(
"""The mean absolute error in crossvalidation is
{:.2f}, the mean variance is {:.2f}.""".format(
mae, mean_std
)
)
[docs] def update_train_set(
self,
indices: np.ndarray,
measurements: np.ndarray,
measurement_uncertainty: np.ndarray = None,
):
"""Update training set following a measurement
Args:
indices (np.ndarray): Indices of design space at which
the measurements were taken
measurements (np.ndarray): Measured values, 2D array.
the length must equal the length of the indices array.
the second direction must equal the number of objectives.
If an objective is missing, provide np.nan. For example,
np.array([1, 1, np.nan])
measurement_uncertainty (np.ndarray): uncertainty in the measuremens,
if not provided (None) will be zero. If it is not None, it must be
an array with the same shape as the measurements
If an objective is missing, provide np.nan.
For example, np.array([1, 1, np.nan])
"""
self._has_train_set = True
assert measurements.shape[1] == self.ndim
assert len(indices) == len(measurements)
if measurement_uncertainty is not None:
assert measurement_uncertainty.shape == measurements.shape
else:
measurement_uncertainty = np.zeros(measurements.shape)
self._y[indices] = measurements
self.measurement_uncertainty[indices] = measurement_uncertainty
# This sets "sampled" to False for the objectives that have not been measured
self.sampled[indices] = ~np.isnan(measurements)
self._turn_to_maximization()
[docs] def augment_design_space( # pylint: disable=invalid-name
self, X_design: np.ndarray, classify: bool = False, clean_classify: bool = True
) -> None:
"""Add new design points to PAL instance
Args:
X_design (np.ndarrary): Design matrix. Two-dimensional array containing
measurements in the rows and the features as the columns.
classify (bool): Reclassifies the new design space, using the old model.
This is, it runs inference, calculates the hyperrectangles, and runs
the classification. Does not increase the iteration count.
Note though that points that already have been classified
as Pareto-optimal will not be re-classified,
e.g., discarded---even if the new design points
dominate the existing "Pareto optimal" points.
Defaults to False.
clean_classify (bool): Reclassifies the new design space,
using the old model. This is, it runs inference,
calculates the hyperrectangles, and runs the classification.
Does not increase the iteration count.
But, in contrast to `classify` it erases all previous classifications,
before running the new classification. Hence, if some new design point
dominates a previously Pareto efficient point,
the previous Pareto optimal point will no longer be classified
as Pareto efficient.
This flag is incompatible with `classify`.
If you choose `clean_classify`, PyePAL will erase
all previous classifications,
independent of what you choose for `classify`.
Defaults to True.
"""
if self.iteration <= 1:
raise ValueError("You must run a iteration before you augment the design space")
number_old_points = self.number_design_points
number_new_points = len(X_design)
assert isinstance(X_design, np.ndarray), "You must provide a two-dimensional numpy array"
assert X_design.ndim == 2, "You must provide a two-dimensional numpy array"
if X_design.shape[1] != self.design_space.shape[1]:
raise ValueError(
"The design matrix you provided has shape {}, \
the pyepal instance uses a design matrix of shape {}.".format(
X_design.shape, self.design_space.shape
)
)
if classify and clean_classify:
warnings.warn(
"You choose both `classify` and `clean_classify`. \
PyePAL will use the `clean_classify` behavior and override \
all previous classifications.",
UserWarning,
)
classify = False
# Update the status matrices
self.pareto_optimal = np.append(
self.pareto_optimal, np.array([False] * number_new_points), 0
)
self.discarded = np.append(self.discarded, np.array([False] * number_new_points), 0)
self.sampled = np.append(
self.sampled, np.array([[False] * self.ndim] * number_new_points), 0
)
self.unclassified = np.append(self.unclassified, np.array([True] * number_new_points), 0)
self.rectangle_ups = np.append(
self.rectangle_ups, np.full([number_new_points, self.ndim], np.nan), 0
)
self.rectangle_lows = np.append(
self.rectangle_lows, np.full([number_new_points, self.ndim], np.nan), 0
)
self.coef_var_mask = np.append(self.coef_var_mask, np.array([True] * number_new_points), 0)
# _means/std are the model predictions
self._means = np.append(self._means, np.full([number_new_points, self.ndim], np.nan), 0)
self.std = np.append(self.std, np.full([number_new_points, self.ndim], np.nan), 0)
# self.y is what needs to be used for train/predict
# as there the data has been turned into maximization
self.y = np.append(self.y, np.zeros((number_new_points, self.ndim)), 0)
# self._y contains the data as provided by the user
self._y = self.y
# measurement_uncertainty is provided in update_train_set by the user
self.measurement_uncertainty = np.append(
self.measurement_uncertainty, np.zeros((number_new_points, self.ndim)), 0
)
# Update the design space
self.design_space = np.append(self.design_space, X_design, 0)
new_indices = np.arange(number_old_points, self.number_design_points)
# Make sure that the new points have the same "state" as the old ones
# This is, we can use the new design space in a proper way for sampling
# or classification
# ToDo: the bug is here that iteration !=1 wherefore it will intersect
# But this makes no sense here when we initialize the highs and lows with zeros
if classify:
self._predict()
self._replace_by_measurements()
self._update_hyperrectangles(new_indices=new_indices)
self._classify()
if clean_classify:
self.pareto_optimal = np.array([False] * self.number_design_points)
# We can be a bit more clever and only re-consider the Pareto optimal
# points. We can leave the
self.unclassified = np.array([True] * self.number_design_points)
self.unclassified[self.discarded_indices] = False
self._predict()
self._replace_by_measurements()
self._update_hyperrectangles(new_indices=new_indices)
self._classify()
[docs] def sample(
self,
exclude_idx: Union[np.array, None] = None,
pooling_method: str = "fro",
sample_discarded: bool = False,
use_coef_var: bool = True,
) -> int:
"""Runs the sampling step based on the size of the hyperrectangle.
I.e., favoring exploration.
Args:
exclude_idx (Union[np.array, None], optional):
Points in design space to exclude from sampling.
Defaults to None.
pooling_method (str): Method that is used to aggregate
the uncertainty in different objectives into one scalar.
Available options are: "fro" (Frobenius/Euclidean norm), "mean",
"median". Defaults to "fro".
sample_discarded (bool): if true, it will sample from all points
and not only from the unclassified and Pareto optimal ones
use_coef_var (bool): If True, uses the coefficient of variation instead of
the unscaled rectangle sizes
Raises:
ValueError: In case there are no uncertainty rectangles,
i.e., when the _predict has not been successfully called.
Returns:
int: Index of next point to evaluate in design space
"""
if (self.rectangle_lows is None) | (self.rectangle_ups is None):
raise ValueError(
"You need to have uncertainty rectangles\
before you can peform the sampling"
)
sampled_mask = self.sampled_mask.copy()
# This is to handle the case of batch sampling, where we do need
# to make sure that we do not sample the same points
if isinstance(exclude_idx, np.ndarray):
if len(exclude_idx) >= 1:
exclude_mask = np.zeros(len(sampled_mask), dtype=bool)
exclude_mask[exclude_idx] = True
sampled_mask += exclude_mask
if sample_discarded:
sampled_idx = _get_max_wt_all(
self.rectangle_lows,
self.rectangle_ups,
self._means,
sampled_mask,
pooling_method,
use_coef_var,
)
else:
sampled_idx = _get_max_wt(
self.rectangle_lows,
self.rectangle_ups,
self._means,
self.pareto_optimal,
self.unclassified,
sampled_mask,
pooling_method,
use_coef_var,
)
return sampled_idx