Source code for pyepal.pal.utils

# -*- coding: utf-8 -*-
# Copyright 2022 PyePAL authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for dealing with Pareto fronts in general"""

import numpy as np
from numba import jit
from scipy.spatial import distance
from sklearn import metrics
from sklearn.cluster import KMeans

from ._hypervolume import HypervolumeIndicator

__all__ = [
    "dominance_check",
    "dominance_check_jitted",
    "dominance_check_jitted_2",
    "dominance_check_jitted_3",
    "is_pareto_efficient",
    "exhaust_loop",
    "get_kmeans_samples",
    "get_maxmin_samples",
    "get_hypervolume",
]


[docs]@jit(nopython=True) def dominance_check(point1, point2) -> bool: """One point dominates another if it is not worse in all objectives and strictly better in at least one. This here assumes we want to maximize""" if np.all(point1 >= point2) and np.any(point1 > point2): return True return False
[docs]@jit(nopython=True) def dominance_check_jitted(point: np.array, array: np.array) -> bool: """Check if point dominates any point in array""" arr_sorted = array[array[:, 0].argsort()] for i in range(len(arr_sorted)): # pylint:disable=consider-using-enumerate if dominance_check(point, arr_sorted[i]): return True return False
[docs]@jit(nopython=True) def dominance_check_jitted_2(array: np.array, point: np.array) -> bool: """Check if any point in array dominates point""" arr_sorted = array[array[:, 0].argsort()[::-1]] for i in range(len(arr_sorted)): # pylint:disable=consider-using-enumerate if dominance_check(arr_sorted[i], point): return True return False
[docs]@jit(nopython=True) def dominance_check_jitted_3(array: np.array, point: np.array, ignore_me: int) -> bool: """Check if any point in array dominates point. ignore_me since numba does not understand masked arrays""" sorted_idx = array[:, 0].argsort()[::-1] ignore_me = np.where(sorted_idx == ignore_me)[0][0] arr_sorted = array[sorted_idx] for i in range(len(arr_sorted)): # pylint:disable=consider-using-enumerate if i != ignore_me: if dominance_check(arr_sorted[i], point): return True return False
[docs]def is_pareto_efficient(costs: np.array, return_mask: bool = True) -> np.array: """Find the Pareto efficient points Based on https://stackoverflow.com/questions/ 32791911/fast-calculation-of-pareto-front-in-python Args: costs (np.array): An (n_points, n_costs) array return_mask (bool, optional): True to return a mask, Otherwise it will be a (n_efficient_points, ) integer array of indices. Defaults to True. Returns: np.array: [description] """ is_efficient = np.arange(costs.shape[0]) n_points = costs.shape[0] next_point_index = 0 # Next index in the is_efficient array to search for while next_point_index < len(costs): nondominated_point_mask = np.any(costs < costs[next_point_index], axis=1) nondominated_point_mask[next_point_index] = True is_efficient = is_efficient[nondominated_point_mask] # Remove dominated points costs = costs[nondominated_point_mask] next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1 if return_mask: is_efficient_mask = np.zeros(n_points, dtype=bool) is_efficient_mask[is_efficient] = True return is_efficient_mask return is_efficient
[docs]def exhaust_loop(palinstance, y: np.array, batch_size: int = 1): # pylint:disable=invalid-name """Helper function that takes an initialized PAL instance and loops the sampling until there is no unclassified point left. This is useful if all measurements are already taken and one wants to test the algorithm with different hyperparameters. Args: palinstance (PALBase): A initialized instance of a class that inherited from PALBase and implemented the ._train() and ._predict() functions y (np.array): Measurements. The number of measurements must equal the number of points in the design space. batch_size (int, optional): Number of indices that will be returned. Defaults to 10. Returns: None. The PAL instance is updated in place """ assert palinstance.number_design_points == len( y ), "The number of points in the design space must equal the number of measurements" while sum(palinstance.unclassified): idx = palinstance.run_one_step(batch_size=batch_size) if idx is not None: palinstance.update_train_set(idx, y[idx])
[docs]def get_kmeans_samples( # pylint:disable=invalid-name X: np.array, n_samples: int, **kwargs ) -> np.array: """Get the samples that are closest to the k=n_samples centroids Args: X (np.array): Feature array, on which the KMeans clustering is run n_samples (int): number of samples are should be selected **kwargs passed to the KMeans Returns: np.array: selected_indices """ assert ( len(X) > n_samples ), "The numbers of points that shall be selected (n_samples),\ needs to be smaller than the length of the feature matrix (X)" assert n_samples > 0 and isinstance( n_samples, int ), "The number of points that shall be selected (n_samples)\ needs to be an integer greater than 0" kmeans = KMeans(n_samples, **kwargs).fit(X) cluster_centers = kmeans.cluster_centers_ closest, _ = metrics.pairwise_distances_argmin_min(cluster_centers, X) return closest
[docs]def get_maxmin_samples( # pylint:disable=invalid-name X: np.array, n_samples: int, metric: str = "euclidean", init: str = "mean", seed: int = None, **kwargs, ) -> np.array: """Greedy maxmin sampling, also known as Kennard-Stone sampling (1). Note that a greedy sampling is not guaranteed to give the ideal solution and the output will depend on the random initialization (if this is chosen). If you need a good solution, you can restart this algorithm multiple times with random initialization and different random seeds and use a coverage metric to quantify how well the space is covered. Some metrics are described in (2). In contrast to the code provided with (2) and (3) we do not consider the feature importance for the selection as this is typically not known beforehand. You might want to standardize your data before applying this sampling function. Some more sampling options are provided in our structure_comp (4) Python package. Also, this implementation here is quite memory hungry. References: (1) Kennard, R. W.; Stone, L. A. Computer Aided Design of Experiments. Technometrics 1969, 11 (1), 137–148. https://doi.org/10.1080/00401706.1969.10490666. (2) Moosavi, S. M.; Nandy, A.; Jablonka, K. M.; Ongari, D.; Janet, J. P.; Boyd, P. G.; Lee, Y.; Smit, B.; Kulik, H. J. Understanding the Diversity of the Metal-Organic Framework Ecosystem. Nature Communications 2020, 11 (1), 4068. https://doi.org/10.1038/s41467-020-17755-8. (3) Moosavi, S. M.; Chidambaram, A.; Talirz, L.; Haranczyk, M.; Stylianou, K. C.; Smit, B. Capturing Chemical Intuition in Synthesis of Metal-Organic Frameworks. Nat Commun 2019, 10 (1), 539. https://doi.org/10.1038/s41467-019-08483-9. (4) https://github.com/kjappelbaum/structure_comp Args: X (np.array): Feature array, this is the array that is used to perform the sampling n_samples (int): number of points that will be selected, needs to be lower than the length of X metric (str, optional): Distance metric to use for the maxmin calculation. Must be a valid option of scipy.spatial.distance.cdist (‘braycurtis’, ‘canberra’, ‘chebyshev’, ‘cityblock’, ‘correlation’, ‘cosine’, ‘dice’, ‘euclidean’, ‘hamming’, ‘jaccard’, ‘jensenshannon’, ‘kulsinski’, ‘mahalanobis’, ‘matching’, ‘minkowski’, ‘rogerstanimoto’, ‘russellrao’, ‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’, ‘sqeuclidean’, ‘wminkowski’, ‘yule’). Defaults to 'euclidean' init (str, optional): either 'mean', 'median', or 'random'. Determines how the initial point is chosen. Defaults to 'center' seed (int, optional): seed for the random number generator. Defaults to None. **kwargs passed to the cdist Returns: np.array: selected_indices """ np.random.seed(seed) assert ( len(X) > n_samples ), "The numbers of points that shall be selected (n_samples),\ needs to be smaller than the length of the feature matrix (X)" assert n_samples > 0 and isinstance( n_samples, int ), "The number of points that shall be selected (n_samples)\ needs to be an integer greater than 0" greedy_data = [] if init == "random": index = np.random.randint(0, len(X) - 1) elif init == "mean": index = np.argmin(np.linalg.norm(X - np.mean(X, axis=0), axis=1)) else: index = np.argmin(np.linalg.norm(X - np.median(X, axis=0), axis=1)) greedy_data.append(X[index]) remaining = np.delete(X, index, 0) while len(greedy_data) < n_samples: dist = distance.cdist(remaining, greedy_data, metric, **kwargs) greedy_index = np.argmax(np.min(dist, axis=0)) greedy_data.append(remaining[greedy_index]) remaining = np.delete(remaining, greedy_index, 0) greedy_indices = [] for datum in greedy_data: greedy_indices.append(np.array(np.where(np.all(X == datum, axis=1)))[0]) greedy_indices = np.concatenate(greedy_indices).ravel() return greedy_indices
[docs]def get_hypervolume( pareto_front: np.array, reference_vector: np.array, prefactor: float = -1 ) -> float: """Compute the hypervolume indicator of a Pareto front I multiply it with minus one as we assume that we want to maximize all objective and then we calculate the area f1 | |----| | -| | -| ------------ f2 But the code we use for the hv indicator assumes that the reference vector is larger than all the points in the Pareto front. For this reason, we then flip all the signs using prefactor This indicator is not needed for the epsilon-PAL algorithm itself but only to allow tracking a metric that might help the user to see if the algorithm converges. """ hv_instance = HypervolumeIndicator(reference_vector) volume = hv_instance.compute(pareto_front * prefactor) return volume
@jit(nopython=True) def get_nondimensional_pareto_error( y_true: np.ndarray, y_pred: np.ndarray, ranges: np.ndarray ) -> float: """Calculates a non-dimensional error metric, the scaled minimum maximum average distance of a Pareto-optimal point to one in the predicted set. This metric is typically used if the full objective space is known, i.e., when we know all y_true (the true Pareto front) and when we can calculate the ranges based on the full objective space. A use case can be to compare runs with different epsilon or with different algorithms in a "fairer" way as no choice of a hypervolume reference point is needed. Args: y_true (np.ndarray): True Pareto front y_pred (np.ndarray): Predicted Pareto front ranges (np.ndarray): Range of every objective, typically calculated based on the full objective space Returns: float: error metric """ cum_distance = 0 for i in range(len(y_true)): # pylint:disable=consider-using-enumerate maximum_distance = np.inf for j in range(len(y_pred)): # pylint:disable=consider-using-enumerate current_maximum_distance = np.max(np.abs(y_true[i] - y_pred[j]) / ranges) if current_maximum_distance < maximum_distance: maximum_distance = current_maximum_distance cum_distance += maximum_distance return cum_distance / len(y_true)