"""A module for plotting data and results of a minimization."""
import logging
import os
from abc import ABC, abstractmethod
import corner
import IPython.display
import numpy as np
import pandas as pd
from skopt import Optimizer
[docs]
class PlotResults():
"""
A class to read in any completed refinement history file, create a Gaussain Process Optimizer
and then do sampling on the result to create a corner plot.
Parameters:
-----------
filename : str
path to the file to load in the refinement history
quantiles : list, optional
optional, list of the quantiles to be plotted on the corner plot, defaults
to [0.34, 0.5, 0.68], e.g. 1-sigma
MH_norm : float, optional
The Metropolis-Hastings normalising factor to determine if points should be
kept or not, defaults to 20
points : int, optional
Number of points to plot on the corner plot, defaults to 100,000
"""
def __init__(self, filename: str, quantiles: 'list[float]'=None,
MH_norm: float=20, points: int =100000):
self.filename = filename
self.quantiles = [0.34, 0.5, 0.68] if quantiles is None else quantiles
self.MH_norm = MH_norm
self.points = points
self.parameter_names, self.parameter_coords,\
self.minmax_coords, self.FoMs = self.get_measured_points()
# Create the optimizer
try:
# The optimizer had a default minimum of 10 points which has since been changed to 2.
# We are still using this '10' value here because we are assuming it is a good minimum
# for 'meaningful' plots, but technically anything with 2 points can be minimised and
# shouldn't cause an error.
old_optimizer_min = 10
self.optimizer = Optimizer(self.minmax_coords,"GP",
n_initial_points=min(old_optimizer_min, len(self.FoMs)),
acq_func="gp_hedge", acq_optimizer="sampling",
model_queue_size=1)
if len(self.FoMs) < old_optimizer_min:
logging.warning("You have only used %d refinement steps,"
" use a larger number for more meaningful plots.",
len(self.FoMs))
except ValueError as error:
raise ValueError("Insufficient number of refinement steps,"
" please use at least 10.") from error
# Train the optimizer
self.optimizer.tell(self.parameter_coords, self.FoMs)
[docs]
def get_measured_points(self) -> tuple:
"""Opens the dataframe in `filename` and extracts the measured parameters names, values
and associated figures of merit.
Returns:
--------
tuple of (parameter names, parameter coordinates, min and max parameters, FoM's)
"""
records = pd.read_csv(self.filename, delimiter=',')
records = records.astype(dtype=float, errors='ignore')
# Convert to float where possible (i.e. not a string)
FoMs = records['FoM'].to_list()
records = records.drop(columns=['Unnamed: 0', 'FoM', 'Change state'], errors='ignore')
# TODO this is hard coded to creation of history, may want to change
coordinates = records.values.tolist()
names = records.columns.tolist()
minmax_coordinates = [(min(np.array(coord)),
max(np.array(coord))) for coord in np.array(coordinates).T]
return names, coordinates, minmax_coordinates, FoMs
def _expected_minimum_random_sampling(self) -> 'tuple[list, float, list, list[list]]':
"""
This is almost verbatim a copy of code from scikit-optimize but with the samples as
an additional output:
https://github.com/scikit-optimize/scikit-optimize/blob/de32b5fd2205a1e58526f3cacd0422a26d315d0f/skopt/utils.py#L259
Returns
-------
min_x : list
location of the minimum.
y_random[index_best_objective] : float
the surrogate function value at the minimum.
y_random : np.array
An array of length "self.points" containing surrogate function values at each point
random_samples : list[list]
A list of length "self.points" containing the coordinates of each prediction
"""
# sample points from search space, set a random seed for reproducibility = 7 w.l.o.g.
random_samples = self.optimizer.space.rvs(self.points, random_state=7)
# make estimations with surrogate
model = self.optimizer.models[-1]
y_random = model.predict(self.optimizer.space.transform(random_samples))
index_best_objective = np.argmin(y_random)
min_x = random_samples[index_best_objective]
return min_x, y_random[index_best_objective], y_random, random_samples
def _remove_points(self, chi_squared: 'list[float]',
coords: 'list[list]') -> 'tuple[list, list]':
"""
Removes points with poor figure of merit based on a Metropolis-Hastings type rule,
where the likelihood of keeping a point is dependent on the exponent of the difference
between its figure of merit, and that of the best one found, divided by MH_norm.
Parameters
----------
chi_squared : list[float]
A list of the predicted chi-squared value at each coordinate
coords : list[list]
A list of the coordinates at which all of the chi-squared predictions are made
Returns
-------
reduced_chi : list
A list of the remaining chi-squared points
reduced_coords : list[list]
A list of the remaining coordinates
"""
np.random.seed(16) # Set for reproducible output - will always retain same points
lowest_chi = min(chi_squared)
points_to_keep = np.random.random(size=chi_squared.shape) < \
np.exp((lowest_chi - chi_squared)/(lowest_chi/self.MH_norm))
reduced_chi=chi_squared[points_to_keep]
reduced_coords = np.array(coords)[points_to_keep]
return reduced_chi, reduced_coords
[docs]
def create_cornerplot(self) -> None:
"""
Performs a random sample across the coordinate space giving a predicted figure of merit at
every point. Then removes points with poor figures of merit, according to a
Metropolis-Hastings type rule, where the likelihood of keeping a point is dependant on the
exponent of the difference between its figure of merit, and that of the best one found,
divided by MC_norm. A corner plot is then returned (a matplotlib figure object), which can
be displayed or exported.
Returns
-------
corner plot : Matplotlib.figure.Figure
A plot displaying every parameter combination with their variances and covariances
"""
try:
_, _, y_random, coords = \
self._expected_minimum_random_sampling()
except IndexError:
msg = (f"\n \n Your data file, {os.path.abspath(f'{self.filename}')},"
" appears not to have any points in, please check you have"
" run the refinement and it saved correctly. \n")
print(msg)
return None
_, reduced_coordinate_list = self._remove_points(y_random, coords)
data = np.empty(shape=np.array(reduced_coordinate_list).shape)
for i in range(np.array(reduced_coordinate_list).shape[1]):
data[:,i] = np.array(reduced_coordinate_list)[:,i]
labels = [str(name) for name in self.parameter_names]
cornerplot = corner.corner(data, labels = labels, quantiles = [0.34, 0.5, 0.68])
mean, std = np.mean(data, axis=0), np.std(data, axis=0)
return cornerplot, mean, std
[docs]
class DataPrinter(ABC):
"""
A class for printing data during a minimisation.
"""
[docs]
@abstractmethod
def print_data(self, history):
"""
Update table at the end of a refinement step.
Parameters:
history
The history of the minimizer data is printed from.
"""
raise NotImplementedError
[docs]
class PlaintextDataPrinter(DataPrinter):
"""Plaintext data printer."""
[docs]
def print_data(self, history) -> None:
with pd.option_context('display.max_colwidth', 12,
'display.precision', 5,
'display.float_format', '{:.4g}'.format):
n_step = history.iloc[-1].name
output = history.loc[[n_step]].to_string(
col_space=12, index=False, header=False).split('\n')
data = '{:4d}'.format(n_step) + ''.join(output)
print(data)
[docs]
class IPythonDataPrinter(DataPrinter):
"""Prettier IPython data printer, for Jupyter Notebooks, etc."""
def __init__(self):
self.display = IPython.display.DisplayHandle()
[docs]
def print_data(self, history) -> None:
history_table = pd.DataFrame(history,
columns=history.columns)
history_table.index.name = "Step"
self.display.update(history_table)
data_printers = {
'plaintext': PlaintextDataPrinter,
'ipython': IPythonDataPrinter,
}