# -*- coding: utf-8 -*-
# Copyright 2022 PyePAL authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for dealing with Pareto fronts in general"""
import numpy as np
from numba import jit
from scipy.spatial import distance
from sklearn import metrics
from sklearn.cluster import KMeans
from ._hypervolume import HypervolumeIndicator
__all__ = [
"dominance_check",
"dominance_check_jitted",
"dominance_check_jitted_2",
"dominance_check_jitted_3",
"is_pareto_efficient",
"exhaust_loop",
"get_kmeans_samples",
"get_maxmin_samples",
"get_hypervolume",
]
[docs]@jit(nopython=True)
def dominance_check(point1, point2) -> bool:
"""One point dominates another if it is not worse in all objectives
and strictly better in at least one. This here assumes we want to maximize"""
if np.all(point1 >= point2) and np.any(point1 > point2):
return True
return False
[docs]@jit(nopython=True)
def dominance_check_jitted(point: np.array, array: np.array) -> bool:
"""Check if point dominates any point in array"""
arr_sorted = array[array[:, 0].argsort()]
for i in range(len(arr_sorted)): # pylint:disable=consider-using-enumerate
if dominance_check(point, arr_sorted[i]):
return True
return False
[docs]@jit(nopython=True)
def dominance_check_jitted_2(array: np.array, point: np.array) -> bool:
"""Check if any point in array dominates point"""
arr_sorted = array[array[:, 0].argsort()[::-1]]
for i in range(len(arr_sorted)): # pylint:disable=consider-using-enumerate
if dominance_check(arr_sorted[i], point):
return True
return False
[docs]@jit(nopython=True)
def dominance_check_jitted_3(array: np.array, point: np.array, ignore_me: int) -> bool:
"""Check if any point in array dominates point. ignore_me
since numba does not understand masked arrays"""
sorted_idx = array[:, 0].argsort()[::-1]
ignore_me = np.where(sorted_idx == ignore_me)[0][0]
arr_sorted = array[sorted_idx]
for i in range(len(arr_sorted)): # pylint:disable=consider-using-enumerate
if i != ignore_me:
if dominance_check(arr_sorted[i], point):
return True
return False
[docs]def is_pareto_efficient(costs: np.array, return_mask: bool = True) -> np.array:
"""Find the Pareto efficient points
Based on https://stackoverflow.com/questions/
32791911/fast-calculation-of-pareto-front-in-python
Args:
costs (np.array): An (n_points, n_costs) array
return_mask (bool, optional): True to return a mask,
Otherwise it will be a (n_efficient_points, ) integer array of indices.
Defaults to True.
Returns:
np.array: [description]
"""
is_efficient = np.arange(costs.shape[0])
n_points = costs.shape[0]
next_point_index = 0 # Next index in the is_efficient array to search for
while next_point_index < len(costs):
nondominated_point_mask = np.any(costs < costs[next_point_index], axis=1)
nondominated_point_mask[next_point_index] = True
is_efficient = is_efficient[nondominated_point_mask] # Remove dominated points
costs = costs[nondominated_point_mask]
next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1
if return_mask:
is_efficient_mask = np.zeros(n_points, dtype=bool)
is_efficient_mask[is_efficient] = True
return is_efficient_mask
return is_efficient
[docs]def exhaust_loop(palinstance, y: np.array, batch_size: int = 1): # pylint:disable=invalid-name
"""Helper function that takes an initialized PAL instance
and loops the sampling until there is no unclassified point left.
This is useful if all measurements are already taken and one
wants to test the algorithm with different hyperparameters.
Args:
palinstance (PALBase): A initialized instance of a class that
inherited from PALBase and implemented the ._train() and
._predict() functions
y (np.array): Measurements.
The number of measurements must equal the number of
points in the design space.
batch_size (int, optional): Number of indices that will be returned.
Defaults to 10.
Returns:
None. The PAL instance is updated in place
"""
assert palinstance.number_design_points == len(
y
), "The number of points in the design space must equal the number of measurements"
while sum(palinstance.unclassified):
idx = palinstance.run_one_step(batch_size=batch_size)
if idx is not None:
palinstance.update_train_set(idx, y[idx])
[docs]def get_kmeans_samples( # pylint:disable=invalid-name
X: np.array, n_samples: int, **kwargs
) -> np.array:
"""Get the samples that are closest to the k=n_samples centroids
Args:
X (np.array): Feature array, on which the KMeans clustering is run
n_samples (int): number of samples are should be selected
**kwargs passed to the KMeans
Returns:
np.array: selected_indices
"""
assert (
len(X) > n_samples
), "The numbers of points that shall be selected (n_samples),\
needs to be smaller than the length of the feature matrix (X)"
assert n_samples > 0 and isinstance(
n_samples, int
), "The number of points that shall be selected (n_samples)\
needs to be an integer greater than 0"
kmeans = KMeans(n_samples, **kwargs).fit(X)
cluster_centers = kmeans.cluster_centers_
closest, _ = metrics.pairwise_distances_argmin_min(cluster_centers, X)
return closest
[docs]def get_maxmin_samples( # pylint:disable=invalid-name
X: np.array,
n_samples: int,
metric: str = "euclidean",
init: str = "mean",
seed: int = None,
**kwargs,
) -> np.array:
"""Greedy maxmin sampling, also known as Kennard-Stone sampling (1).
Note that a greedy sampling is not guaranteed to give the ideal solution
and the output will depend on the random initialization (if this is chosen).
If you need a good solution, you can restart this algorithm multiple times
with random initialization and different random seeds
and use a coverage metric to quantify how well
the space is covered. Some metrics are described in (2). In contrast to the
code provided with (2) and (3) we do not consider the feature importance for the
selection as this is typically not known beforehand.
You might want to standardize your data before applying this sampling function.
Some more sampling options are provided in our structure_comp (4) Python package.
Also, this implementation here is quite memory hungry.
References:
(1) Kennard, R. W.; Stone, L. A. Computer Aided Design of Experiments.
Technometrics 1969, 11 (1), 137–148. https://doi.org/10.1080/00401706.1969.10490666.
(2) Moosavi, S. M.; Nandy, A.; Jablonka, K. M.; Ongari, D.; Janet,
J. P.; Boyd, P. G.; Lee, Y.; Smit, B.; Kulik, H. J.
Understanding the Diversity of the Metal-Organic Framework Ecosystem.
Nature Communications 2020, 11 (1), 4068.
https://doi.org/10.1038/s41467-020-17755-8.
(3) Moosavi, S. M.; Chidambaram, A.; Talirz, L.; Haranczyk, M.; Stylianou, K. C.;
Smit, B. Capturing Chemical Intuition in Synthesis of Metal-Organic Frameworks.
Nat Commun 2019, 10 (1), 539. https://doi.org/10.1038/s41467-019-08483-9.
(4) https://github.com/kjappelbaum/structure_comp
Args:
X (np.array): Feature array, this is the array
that is used to perform the sampling
n_samples (int): number of points that will be selected,
needs to be lower than the length of X
metric (str, optional): Distance metric to use for the maxmin calculation.
Must be a valid option of scipy.spatial.distance.cdist
(‘braycurtis’, ‘canberra’, ‘chebyshev’,
‘cityblock’, ‘correlation’, ‘cosine’, ‘dice’,
‘euclidean’, ‘hamming’, ‘jaccard’, ‘jensenshannon’, ‘kulsinski’,
‘mahalanobis’, ‘matching’, ‘minkowski’, ‘rogerstanimoto’, ‘russellrao’,
‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’, ‘sqeuclidean’,
‘wminkowski’, ‘yule’). Defaults to 'euclidean'
init (str, optional): either 'mean', 'median', or 'random'.
Determines how the initial point is chosen. Defaults to 'center'
seed (int, optional): seed for the random number generator. Defaults to None.
**kwargs passed to the cdist
Returns:
np.array: selected_indices
"""
np.random.seed(seed)
assert (
len(X) > n_samples
), "The numbers of points that shall be selected (n_samples),\
needs to be smaller than the length of the feature matrix (X)"
assert n_samples > 0 and isinstance(
n_samples, int
), "The number of points that shall be selected (n_samples)\
needs to be an integer greater than 0"
greedy_data = []
if init == "random":
index = np.random.randint(0, len(X) - 1)
elif init == "mean":
index = np.argmin(np.linalg.norm(X - np.mean(X, axis=0), axis=1))
else:
index = np.argmin(np.linalg.norm(X - np.median(X, axis=0), axis=1))
greedy_data.append(X[index])
remaining = np.delete(X, index, 0)
while len(greedy_data) < n_samples:
dist = distance.cdist(remaining, greedy_data, metric, **kwargs)
greedy_index = np.argmax(np.min(dist, axis=0))
greedy_data.append(remaining[greedy_index])
remaining = np.delete(remaining, greedy_index, 0)
greedy_indices = []
for datum in greedy_data:
greedy_indices.append(np.array(np.where(np.all(X == datum, axis=1)))[0])
greedy_indices = np.concatenate(greedy_indices).ravel()
return greedy_indices
[docs]def get_hypervolume(
pareto_front: np.array, reference_vector: np.array, prefactor: float = -1
) -> float:
"""Compute the hypervolume indicator of a Pareto front
I multiply it with minus one as we assume that we want
to maximize all objective and then we calculate the area
f1
|
|----|
| -|
| -|
------------ f2
But the code we use for the hv indicator assumes that the reference vector
is larger than all the points in the Pareto front.
For this reason, we then flip all the signs using prefactor
This indicator is not needed for the epsilon-PAL algorithm itself
but only to allow tracking a metric that might help the user to see
if the algorithm converges.
"""
hv_instance = HypervolumeIndicator(reference_vector)
volume = hv_instance.compute(pareto_front * prefactor)
return volume
@jit(nopython=True)
def get_nondimensional_pareto_error(
y_true: np.ndarray, y_pred: np.ndarray, ranges: np.ndarray
) -> float:
"""Calculates a non-dimensional error metric,
the scaled minimum maximum average distance of a Pareto-optimal
point to one in the predicted set. This metric is typically used
if the full objective space is known, i.e., when we know all y_true
(the true Pareto front) and when we can calculate the ranges based on
the full objective space. A use case can be to compare runs with different
epsilon or with different algorithms in a "fairer" way as no choice of
a hypervolume reference point is needed.
Args:
y_true (np.ndarray): True Pareto front
y_pred (np.ndarray): Predicted Pareto front
ranges (np.ndarray): Range of every objective, typically calculated
based on the full objective space
Returns:
float: error metric
"""
cum_distance = 0
for i in range(len(y_true)): # pylint:disable=consider-using-enumerate
maximum_distance = np.inf
for j in range(len(y_pred)): # pylint:disable=consider-using-enumerate
current_maximum_distance = np.max(np.abs(y_true[i] - y_pred[j]) / ranges)
if current_maximum_distance < maximum_distance:
maximum_distance = current_maximum_distance
cum_distance += maximum_distance
return cum_distance / len(y_true)