"""A module for performing the refinement"""
import statistics
from copy import deepcopy
from typing import List, Dict
from contextlib import suppress
from datetime import datetime
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d, interp2d
from verbosemanager import VerboseManager
from MDMC.control.plot_results import PlotResults
from MDMC.common.decorators import repr_decorator
from MDMC.MD.parameters import Parameters
from MDMC.MD.simulation import Simulation
from MDMC.refinement.minimizers.minimizer_factory import MinimizerFactory
from MDMC.refinement.FoM.FoM_factory import FoMFactory
from MDMC.refinement.FoM.FoM_abs import ObservablePair
from MDMC.resolution.resolution_factory import ResolutionFactory
from MDMC.trajectory_analysis.observables.obs_factory \
import ObservableFactory
from MDMC.trajectory_analysis.observables.obs import Observable
[docs]@repr_decorator('simulation', 'exp_datasets', 'FoM_calculator', 'minimizer',
'reset_config', 'fit_parameters', 'MD_steps',
'verbose')
class Control:
"""
Controls the MDMC refinement
Parameters
----------
simulation : Simulation
Performs a simulation for a given set of potential ``Parameter``
objects.
exp_datasets : list of dicts
Each `dict` represents an experimental dataset, containing the
following keys:
- ``file_name`` (`str`) the file name
- ``type`` (`str`) the type of observable
- ``reader`` (`str`) the reader required for the file
- ``weighting`` (`float`) the weighting of the dataset to be used in
the Figure of Merit calculation
- ``resolution`` : (`dict`, but can be `str`, `float`, or None)
Should be a one-line dict of format {'file' : str} or {key : float}
if {'file' : str}, the str should be a file in
the same format as ``file_name`` containing results of a vanadium
sample which is used to determine instrument energy resolution for
this dataset.
If {key : float}, the key should be the type of resolution function.
allowed types are 'gaussian' and 'lorentzian'
Can also be 'lazily' given as just a `str` or `float`.
If just a string is given, it is assumed to be a filename.
If just a float is given, it is assumed to be Gaussian.
The float should be the instrument energy resolution as the FWHM in ``ueV`` (micro eV).
If you have already accounted for instrument resolution in your dataset,
this field can be set to None, and resolution application will be skipped.
This must be done explicitly.
- ``rescale_factor`` (`float`, optional, defaults to `1.`) applied to
the experimental data when calculating the FoM to ensure it is on
the same scale as the calculated observable
- ``auto_scale`` (`bool`, optional, defaults to `False`) set the
``rescale_factor`` automatically to minimise the FoM, if both
``rescale_factor`` and ``auto_scale`` are provided then a warning
is printed and ``auto_scale`` takes precedence
- ``use_FFT`` (`bool`, optional, defaults to `True`) whether to use
Fast Fourier Transforms in the calculation of dependent variables.
FFT speeds up calculation but places restrictions on spacing in the
independent variable domain(s). This option may not be supported
for all ``Observable``s
Note that the default (and preferred) behaviour of the scaling settings requires that the
dataset provided has been properly scaled and normalised for the refinement process.
Arbitrary or automatic rescaling should be undertaken with care, as it does not take into
account any physical aspects of scaling the data, such as the presence or absence of
background events from peaks outside its range.
fit_parameters : Parameters, list of Parameter
All parameters which will be refined. Note that any ``Parameter`` that is ``fixed``,
``tied`` or equal to 0 will not be passed to the minimizer as these cannot be refined.
Those with ``constraints`` set are still passed.
minimizer_type : str, optional
The ``Minimizer`` type. Default is 'MMC'.
FoM_options : dict of {str : str}, optional
Defines the details of the ``FigureOfMeritCalculator`` to use. Default is `None`, in which
case the first option for each of the following keys is used:
- ``errors`` {'exp', 'none'} Whether to weight the differences between MD and
experimental data with the experimental error or not (errors from MD are not currently
supported).
- ``norm`` {'data_points', 'dof', 'none'} How to normalise the FoM, either by the total
number of data_points (n), the degrees of freedom (n - m, where m is the number of
parameters being varied), or not at all.
reset_config : bool, optional
Determines if the configuration is reset to the end of the last accepted
state. Default is `True`.
MD_steps : int, optional
Number of molecular dynamics steps for each step of the refinement.
When not provided, the minimum number of steps needed for successful
calculation of the observables is used. If provided, the actual number
of steps may be reduced to prevent running MD that won't be used when
calculating dependent variables. Default is `None`.
equilibration_steps : int, optional
Number of molecular dynamics steps used to equilibrate the ``Universe``
in between each refinement step. When changes to the ``Parameters`` are small,
this equilibration can be much shorter than the equilibration needed before
starting the refinement process, but in general will vary depending on
the details of the ``Universe`` and ``Parameters``. Default is 0.
verbose: int, optional
The level of verbosity:
Verbose level -1 hides all outputs for tests.
Verbose level 0 gives no information.
Verbose level 1 gives final time for the whole method.
Verbose level 2 gives final time and also a progress bar.
Verbose level 3 gives final time, a progress bar, and time per step.
print_full_settings: bool, optional
Whether to print all settings/attributes/parameters passed to this object,
defaults to False.
**settings: dict, optional
n_steps: int
The maximum number of refinement steps to be. An optional parameter that,
if specified, will be used in the ``refine`` method unless a different value
of ``n_steps`` is explicitly passed when running ``refine``.
cont_slicing : bool, optional
Flag to decide between two possible behaviours when the number of ``MD_steps`` is
larger than the minimum required to calculate the observables. If ``False`` (default)
then the ``CompactTrajectory`` is sliced into non-overlapping sub-``CompactTrajectory``
blocks for each of which the observable is calculated. If ``True``, then the
``CompactTrajectory`` is sliced into as many non-identical sub-``CompactTrajectory``
blocks as possible (with overlap allowed).
results_filename: str
The name of the file in which the results of the MDMC run will be stored
MC_norm: int
1 if the MMC minimiser is to be used for parameter optimisation.
use_average : bool, optional
Optional parameter relevant in case ``MD_steps`` is larger than the minimum required
and the MD ``CompactTrajectory`` is sliced into sub-``CompactTrajectory`` blocks.
If ``True`` (default) the observables are averaged over the sub-``CompactTrajectory``
blocks. If ``False`` they are not averaged.
Example
-------
An example of an exp_dataset list is::
[{'file_name':data.LAMP_SQW_FILE,
'type':'SQw',
'reader':'LAMPSQw',
'weight':1.,
'resolution':{'file':data.LAMP_SQW_VAN_FILE}
'rescale_factor':0.5},
{'file_name:data.ANOTHER_FILE',
'type':'FQt',
'reader':'GENERIC_READER',
'weight':0.5,
'resolution':{'gaussian':2.35}
'auto_scale':True}]
Attributes
----------
simulation : Simulation
The ``Simulation`` on which is used to perform the refinement
exp_datasets : `list of dicts`
One `dict` per experimental dataset used for the refinement
fit_parameters : Parameters
All ``Parameter`` objects which will be refined
minimizer : Minimizer
Refines the potential parameters.
observable_pairs : `list` of ``ObservablePairs``
Experimental observable/MD observable pairs which are used to calculate
the Figure of Merit
FoM_calculator : FigureOfMeritCalculator
Calculates the FoM `float` from the ``observable_pairs``.
MD_steps : `int`
Number of molecular dynamics steps for each step of the refinement
"""
def __init__(self, simulation: Simulation, exp_datasets: List[dict],
fit_parameters: Parameters,
minimizer_type: str = 'MMC', FoM_options: dict = None,
reset_config: bool = True, MD_steps: int = None,
equilibration_steps: int = 0,
verbose: int = 0,
print_all_settings: bool = False,
**settings: dict):
self.step_timings: list = []
self.simulation = simulation
self.exp_datasets = exp_datasets
self.verbose = verbose
self.timings: dict = {'equilibrate': [],
'_run_MD': [],
'convert_trajectory': [],
'FoM_calculator': [],
'minimizer': [],
'TOTAL STEP': []}
# Remove any fixed, tied or parameters equal to 0 as these cannot be refined
# if a Parameters object, convert to list first for comprehension
if isinstance(fit_parameters, Parameters):
fit_parameters = list(fit_parameters.values())
fit_parameters = [p for p in fit_parameters
if (not (p.fixed or p.tied) and p.value != 0)]
self.fit_parameters = Parameters(fit_parameters)
self.reset_config = reset_config
self.equilibration_steps = equilibration_steps
self.settings = settings
self.n_steps = settings.get('n_steps')
self.results_filename = settings.get('results_filename',
f'results_{datetime.now().strftime("%Y-%m-%d--%H-%M-%S")}.csv')
settings['results_filename'] = self.results_filename
# Minimizer FoM_old is always initialised to infinity, so that first MC
# step (i.e. the setup) is always accepted.
# pylint: disable=line-too-long
# disable this pylint warning as this can't be fixed in a way that looks good
self.minimizer = MinimizerFactory.create_minimizer(minimizer_type, self,
self.fit_parameters, **settings)
# Create experimental observables from datasets and placeholders for
# experimental observables calculated from MD
self.observable_pairs = []
minimum_MD_steps = 0
for dset in exp_datasets:
try:
use_FFT = dset['use_FFT']
except KeyError:
use_FFT = True
exp_observable = self._read_observable_from_file(dset['type'],
dset['reader'],
dset['file_name'],
use_FFT)
if exp_observable.uniformity_requirements:
exp_observable = self._make_data_uniform(exp_observable)
MD_observable = self._create_empty_observable(exp_observable, exp_observable.use_FFT)
self._validate_energy(MD_observable)
auto_scale = dset.get('auto_scale', False)
rescale_factor = dset.get('rescale_factor')
if auto_scale and rescale_factor and self.verbose != -1:
print('Both `rescale_factor` and `auto_scale` set for file {};'
' scaling will be automated to minimise FoM'
''.format(dset['file_name']))
rescale_factor = 1.
elif not rescale_factor:
rescale_factor = 1.
observable_pair = ObservablePair(exp_observable,
MD_observable,
dset['weight'],
rescale_factor=rescale_factor,
auto_scale=auto_scale)
self.observable_pairs.append(observable_pair)
# Take the largest minimum number of MD_steps needed by any dataset
min_MD_steps_dset = self._calculate_minimum_MD_steps(
observable_pair)
minimum_MD_steps = max(minimum_MD_steps, min_MD_steps_dset)
if FoM_options is None or FoM_options.get('error') is None:
FoM_error = 'exp'
else:
FoM_error = FoM_options.get('error')
if FoM_options is None or FoM_options.get('norm') is None:
FoM_norm = 'data_points'
else:
FoM_norm = FoM_options.get('norm')
self.FoM_calculator = FoMFactory.create_FoM(FoM_error, self.observable_pairs,
norm=FoM_norm,
n_parameters=len(self.fit_parameters))
# Use specified MD_steps if supplied, else calculate
# cont_slicing produces small sub-trajectories, so calculation is unnecessary
if MD_steps:
try:
assert MD_steps >= minimum_MD_steps
if self.settings.get('cont_slicing', False):
self.MD_steps = MD_steps
else:
# Set self.MD_steps to be the largest number required by any of
# our observable pairs
maximum_MD_steps = minimum_MD_steps
for pair in self.observable_pairs:
max_MD_steps_pair = self._calculate_maximum_MD_steps(
MD_steps, pair)
maximum_MD_steps = max(maximum_MD_steps, max_MD_steps_pair)
self.MD_steps = maximum_MD_steps
except AssertionError as error:
raise ValueError('Experimental datasets provided require a '
f'minimum MD_steps value of {minimum_MD_steps} in order to '
'calculate observables') from error
else:
self.MD_steps = minimum_MD_steps
# read in resolutions for experimental datasets
for i, dset in enumerate(exp_datasets):
resolution_factory = ResolutionFactory()
dt = self.simulation.time_step * self.simulation.traj_step
if 'resolution' not in dset.keys():
# create list of user keys for resolutions to add to the error
userkeys = []
for setting in resolution_factory.resolutions:
userkeys.append(setting.lower().replace('resolution', ''))
raise KeyError("A resolution function must be added. Recognised functions are " +
str(userkeys) +
". If you meant to apply no resolution,"
" then specify resolution as None for the exp_dataset parameters.")
resolution = resolution_factory.create_instance(dset['resolution'], dset['type'],
dset['reader'], dt)
self.observable_pairs[i].exp_obs.resolution = resolution
self.observable_pairs[i].MD_obs.resolution = resolution
# setup the dataframe for stdout
data_array = [
["-"],
[minimizer_type],
[self.FoM_calculator.__class__.__name__],
[len(self.observable_pairs)],
[len(self.fit_parameters)],
]
index_array = [
'- Attributes',
' Minimizer',
' FoM type',
' Number of observables',
' Number of parameters'
]
# Printing settings
if print_all_settings:
for item in ["MD_steps", "equilibration_steps", "reset_config", "verbose"]:
index_array.append(f' {item}')
data_array.append([self.__dict__[item]])
# pylint: disable=consider-using-dict-items
index_array.append("- Control Settings")
data_array.append(["-"])
for setting in settings:
index_array.append(f' {setting}')
data_array.append([settings[setting]])
index_array.append("- Parameters")
data_array.append(["-"])
for parameter in fit_parameters:
index_array.append(f' {parameter.name}')
data_array.append([parameter.value])
index_array.append("- Experimental Datasets")
data_array.append(["-"])
if exp_datasets is not None:
for dataset in exp_datasets:
for key in dataset.keys():
index_array.append(f' {key}')
data_array.append([dataset[key]])
index_array.append("- FoM Options")
data_array.append(["-"])
if FoM_options is not None:
for key in FoM_options.keys():
index_array.append(f' {key}')
data_array.append([FoM_options[key]])
setup_frame = pd.DataFrame(data=data_array,
index=index_array)
if self.verbose != -1:
print(f'Control created with:\n{setup_frame.to_string(index=True, header=False)}\n')
def __str__(self) -> str:
exp_dataset_types = [dataset['type'] for dataset in self.exp_datasets]
# plural adds "s" to end of "parameter" if there is more than one parameter
plural = ("" if len(self.fit_parameters) == 1 else "s")
return (f"{self.__class__.__name__} refining {len(self.fit_parameters)} parameter{plural} "
f"using {exp_dataset_types} data types")
[docs] def refine(self, n_steps: int = None) -> None:
"""
Refines the specified potential parameters
Parameters
----------
n_steps : int
Maximum number of steps for the refinement. Must be specified either when creating
the ``Control`` object or when calling this method. The value specified to this
method supersedes the value passed (if any) when the ``Control`` object was created.
If nothing is passed, the method will check if a number was specified when the
``Control`` object was created and use that value.
Examples
--------
Perform a refinement with a maximum of 100 steps:
.. highlight:: python
.. code-block:: python
control.refine(100)
"""
if n_steps is None:
if self.n_steps is None:
raise ValueError('The number of maximum refinement steps has not been specified')
else:
self.n_steps = n_steps
# calculate verbose steps
# 4 steps per refinement step, and n + 1 steps total
verbose_steps = (self.n_steps + 1) * 4
# initialise step timings list for average step timings at end
self.step_timings = []
count = -1
if self.verbose != -1:
self._print_header() # creates stdout header
verbose_manager = VerboseManager.instance()
verbose_manager.start(verbose_steps, verbose=self.verbose)
while count < n_steps and not self.minimizer.has_converged():
if count >= 0 and self.equilibration_steps > 0:
self.equilibrate()
verbose_manager.header(f"Step {count + 1}")
self.step() # advance the refinement by one step
count += 1
if self.verbose == 3: # if progress bar is there, ensure data is on new line
print("")
self._print_data()
else:
while count < n_steps and not self.minimizer.has_converged():
if count >= 0 and self.equilibration_steps > 0:
self.equilibrate()
self.step() # advance the refinement by one step
count += 1
# Try/except accounts for n_steps <= -1
try:
# Reset the minimizer parameters to those from the final FoM:
# to account for a current side effect of step()
self.minimizer.reset_parameters()
self._update_engine_parameters()
except TypeError:
pass
# print values of final parameters
result_string = self.minimizer.present_result()
if self.verbose != -1:
print(result_string)
# If automatically scaling data print the scale factor for each dataset
scaling_keys = []
scaling_values = []
for i, observable_pair in enumerate(self.observable_pairs):
if observable_pair.auto_scale:
dset = self.exp_datasets[i]
scaling_keys.append(' {}'.format(dset['file_name']))
scaling_values.append([observable_pair.rescale_factor])
if len(scaling_keys) > 0 and len(scaling_values) > 0:
scaling_df = pd.DataFrame(scaling_values, index=scaling_keys)
if self.verbose != -1:
print(
f'\nAutomatic Scale Factors\n{scaling_df.to_string(index=True, header=False)}')
if self.verbose >= 1:
average_timing = statistics.mean(self.step_timings)
if self.verbose != -1:
print(
f'\nAverage time per step was {np.round_(average_timing, 2)} seconds.')
verbose_manager.finish("Refinement")
[docs] def equilibrate(self) -> None:
"""
Run molecular dynamics to equilibrate the ``Universe``.
"""
self.simulation.run(self.equilibration_steps, equilibration=True,
verbose=False)
[docs] def step(self) -> None:
"""
Do a full step: generate and run MD to calculate FoM for existing
parameters, iterate parameters a step forward and reset MD (phasespace)
if previous step was rejected and reset_config = true
"""
verbose_manager = VerboseManager.instance()
verbose_manager.start(4, verbose=self.verbose)
# Generate FoM by running MD for this step and then calculate FoM
fom = self._generate_FoM()
verbose_manager.step("Selecting new parameters and updating engine")
# Select new parameters to consider
self.minimizer.step(fom)
# Update the MD engine with new parameters
self._update_engine_parameters()
# When reset_config=true reset the MD (phasespace) back if the
# previous step was rejected
if self.reset_config:
if self.minimizer.state_changed:
# Set MD engine to remember new config
self.simulation.engine.save_config()
else:
# Set MD engine to reset to old config
self.simulation.engine.reset_config()
self.minimizer.write_history(self.results_filename)
step_timings = verbose_manager.finish("Refinement step")
self.step_timings.append(step_timings)
[docs] def plot_results(self, filename: str=None, points: int=100000, MH_norm: float=20.0) -> None:
"""
Instantiates an insstance of the PlotResults class and generates a cornerplot
from the data in self.results_filename
Parameters
----------
filename : str, optional
The filename and path to the history file. Defaults to None which
then uses self.results_filename
points : int, optional
The number of samples to initially generate, defaults to 100,000
MH_norm : float, optional
The denominator of the exponent, controlling how likley points are to be kept,
defaults to 20.0
Returns
-------
corner plot : Matplotlib.figure.Figure
A plot displaying every parameter combination with their variances and covariances
"""
if filename is None:
filename = self.results_filename
plotter = PlotResults(filename, MH_norm=MH_norm, points=points,
quantiles=[0.34, 0.5, 0.68])
cornerplot, means, stds = plotter.create_cornerplot()
if self.verbose != -1:
print(f'Parameter means = {means}, Parameter errors = {stds}')
return cornerplot
def _print_data(self) -> None:
with pd.option_context('display.max_colwidth', 12,
'display.precision', 5,
'display.float_format', '{:.4g}'.format):
n_step = self.minimizer.history.iloc[-1].name
output = self.minimizer.history.loc[[n_step]].to_string(
col_space=12, index=False, header=False).split('\n')
data = '{:4d}'.format(n_step) + ''.join(output)
print(data)
def _print_header(self) -> None:
def format_column(column):
column = column if len(column) < 13 else column[:9] + '...'
return ' ' * (12 - len(column)) + column
columns = ' '.join([format_column(col) for col
in self.minimizer.history.columns])
header = 'Step' + columns
print(header)
def _generate_FoM(self) -> float:
"""
Run the MD for an iteration/step, calculate observable, compare with
observed and return the FoM
Returns
-------
`float`
Non-negative `float` FoM
"""
self._run_MD()
self._calculate_observables(self.simulation, self.observable_pairs)
FoM_value = self.FoM_calculator.calculate()
return FoM_value
def _run_MD(self) -> None:
"""
Run a molecular dynamics simulation
"""
self.simulation.run(self.MD_steps, verbose=False)
def _update_engine_parameters(self) -> None:
"""
Update the force field parameters of the MD engine
"""
self.simulation.engine.update_parameters()
@staticmethod
def _read_observable_from_file(obstype: str, reader: str, file_name: str,
use_FFT: bool = True) -> Observable:
"""
Creates an Observable of the specified type and reads in data from file
Parameters
----------
obstype : str
The ``type`` of the ``Observable``.
reader : str
The ``type`` of the ``Reader``.
file_name : str
The absolute or relative path and the file name.
use_FFT: bool, optional
boolean determining if the FFT should be used, default is True
Returns
-------
``Observable``
An ``Observable`` of specified ``type``
"""
observable = ObservableFactory.create_observable(obstype)
observable.read_from_file(reader=reader, file_name=file_name)
observable.use_FFT = use_FFT
return observable
@staticmethod
def _create_empty_observable(exp_observable: Observable, use_FFT: bool = True) -> Observable:
"""
Creates a ``Observable`` without data but with independent variables
specified from another ``Observable``. This is a placeholder in which
the ``Observable`` can be calculated from an MD trajectory.
Parameters
----------
exp_observable : Observable
An ``Observable`` with defined independent variables.
use_FFT: bool, optional
boolean determining if the FFT should be used, default is True
Returns
-------
``Observable``
An ``Observable`` with only independent variables and
``origin == 'MD'``
"""
observable = ObservableFactory.create_observable(exp_observable.name)
observable.origin = 'MD'
observable.independent_variables = deepcopy(
exp_observable.independent_variables)
observable.use_FFT = use_FFT
return observable
def _calculate_observables(self,
simulation: Simulation,
observable_pairs: 'list[ObservablePair]') -> None:
"""
Calculates all of the ``Observable`` objects from the MD
trajectory/configurations
Parameters
----------
simulation : Simulation
``MDEngine`` with defined trajectory attribute
observable_pairs : list of ObservablePairs
``ObservablesPairs`` for which the MD ``Observable`` will be
calculated
"""
verbose_manager = VerboseManager.instance()
# this is a subprocess, so we don't bother giving maximum steps since it won't be used
verbose_manager.start(0, verbose=self.verbose)
verbose_manager.step("Converting trajectory")
trj = simulation.engine.convert_trajectory()
verbose_manager.step("Calculating observables from the MD trajectory")
for pair in observable_pairs:
obs_timings = pair.MD_obs.calculate_from_MD(trj, verbose=self.verbose, **self.settings)
if self.verbose == 1 and obs_timings is not None:
for key, value in obs_timings.items():
if key not in self.timings:
self.timings[key] = []
self.timings[key] += value
verbose_manager.finish("Calculating observables")
def _calculate_minimum_MD_steps(self, observable_pair: ObservablePair) -> int:
"""
Calculates the minimum number of steps required for the MD engine in
order to calculate MD ``Observables`` objects with the same independent
variables as the experimental ``Observable`` objects.
Parameters
----------
observable_pair : ObservablePair
``ObservablesPair`` for which the required number of ``MD_steps``
is calculated
Returns
-------
`int`
Number of ``MD_steps``
"""
time_step = self.simulation.time_step
traj_step = self.simulation.traj_step
# Calculate the time separation between trajectory frames, dt
dt = time_step * traj_step
# Calculate the minimum number of trajectory frames needed for the
# calculation of the dependent_variables of observable_pair
minimum_frames = observable_pair.exp_obs.minimum_frames(dt)
return traj_step * minimum_frames
def _calculate_maximum_MD_steps(self, MD_steps: int,
observable_pair: ObservablePair) -> int:
"""
Calculates the maximum number of steps that ``observable_pair`` will be
able to use when calculating dependent variables whilst still being
below the ``MD_steps`` specified by the user. Any additional steps
beyond this would not contribute to the calculation.
If ``observable_pair.exp_obs.maximum_frames()`` is None, then all frames can be used so we
just return the largest multiple of ``traj_steps``.
Otherwise, we calculate the largest multiple of
``traj_step * observable_pair.exp_obs.maximum_frames()``, as we can then calculate the
dependent variable multiple times by taking subsets of the total trajectory.
Parameters
----------
MD_steps : int
The hard upper limit on the number of steps to run, as specified by
the user
observable_pair : ObservablePair
``ObservablesPair`` for which the required number of ``MD_steps``
is calculated
Returns
-------
int
Maximum number of ``MD_steps`` needed
"""
traj_step = self.simulation.traj_step
maximum_frames = observable_pair.exp_obs.maximum_frames()
if maximum_frames is None:
maximum_steps = traj_step
else:
maximum_steps = traj_step * maximum_frames
return maximum_steps * (MD_steps // (maximum_steps))
@staticmethod
def _is_data_uniform(observable: Observable) -> Dict[str, Dict[str, bool]]:
"""
Checks if the values for each independent variable of an ``Observable`` are uniformly
spaced and if they start at zero. This information is returned in a single dictionary.
Parameters
----------
observable : Observable
The ``Observable`` for which to check the independent variables.
Returns
-------
`Dict[str, Dict[str, bool]]`
An outer dictionary where the independent variables of the ``Observable`` are the keys,
and the values are another dictionary corresponding to that variable. This inner
dictionary has the same format for all variables, with the two keys 'uniform' and
'zeroed'. The values for these keys are booleans that state whether the data fulfils
the corresponding requirement.
Examples
--------
>>> _is_data_uniform(observable)
{'E': {'uniform': True, 'zeroed': True}, 'Q': {'uniform': True, 'zeroed': False}}
"""
uniformity_dict = {}
for var_key, var_data in observable.independent_variables.items():
uniform_data = np.linspace(
min(var_data), max(var_data), num=len(var_data))
is_uniform = np.allclose(var_data, uniform_data, rtol=1e-5)
uniformity_dict[var_key] = {
'uniform': is_uniform, 'zeroed': var_data[0] == 0}
return uniformity_dict
def _make_data_uniform(self, observable: Observable) -> Observable:
"""
Takes an ``Observable``, checks the requirements for its ``independent_variables``
to be uniform or start at zero, creates uniform grids for the variables that do not
satisfy their requirement, interpolates the ``dependent_variables`` as needed,
and returns an ``Observable`` with the uniform/interpolated variables.
Limited to ``Observables`` with two-dimensional ``dependent_variables`` (e.g. SQw).
This may change in future.
Parameters
----------
observable : Observable
An ``Observable`` for which the independent variables
need to be made uniform / to start at zero. Currently
limited to ``Observables`` for which the ``dependent_variables`` are two-dimensional.
Returns
-------
``Observable``
Returns a copy of the passed ``Observable`` with the independent variables put onto
uniform grid (for the variables where that is necessary)
and the dependent variables interpolated onto the same grid
"""
# get the uniformity requirements from the Observable
uniformity_required = observable.uniformity_requirements
if uniformity_required is None:
return observable
# determine for all independent_variables if they are currently uniform or start at zero
uniformity_state = self._is_data_uniform(observable)
# initialise helper list for the independent_variables that need to be made uniform
indep_vars_to_be_changed = []
# loop through requirements
for var_key, var_required in uniformity_required.items():
var_state = uniformity_state[var_key]
# if the variable has a requirement AND it is not satisfied
# (for either uniformity OR zero-start)
# then add it to the list of variables that need to be changed
if (var_required['uniform'] and not var_state['uniform']) or \
(var_required['zeroed'] and not var_state['zeroed']):
indep_vars_to_be_changed.append(var_key)
# if all uniformity requirements are already satisfied
# simply return the original observable
if not indep_vars_to_be_changed:
return observable
# initialise a helper dictionary to hold the new independent variables
indep_var_uniform = {}
# loop through all independent variables
for var_key in uniformity_state:
# check if the independent variable needs to be made uniform
if var_key in indep_vars_to_be_changed:
data = observable.independent_variables[var_key]
if uniformity_required[var_key]['zeroed']:
minimum = 0
else:
minimum = min(data)
uniform_data = np.linspace(minimum, max(data), num=len(data))
indep_var_uniform[var_key] = uniform_data
# if uniformity requirements are satisfied already,
# add the data points to the helper dictionary
else:
indep_var_uniform[var_key] = observable.independent_variables[var_key]
# get the indexing order of independent variables within the dependent variables
var_indexing = observable.dependent_variables_structure
# loop through the dependent variables and interpolate them
for var_key, data_list in observable.dependent_variables.items():
# Experimental Observables should only have 1 element in data_list
try:
assert len(data_list) == 1
data = data_list[0]
except AssertionError as error:
raise AssertionError('Expected experimental dataset to only have one dependent '
f'variable entry for {var_key}, '
f'but found {len(data_list)} instead') from error
# determine the dimension of the dependent variable
var_dimension = data.ndim
# interpolation for 1D
if var_dimension == 1:
x_data = observable.independent_variables[var_indexing[var_key][0]]
data_interpol = interp1d(x_data, data)
x_uniform = indep_var_uniform[var_indexing[var_key][0]]
uniform_data = data_interpol(x_uniform)
# repeat the interpolation for the errors
err_data = observable.errors[var_key][0]
err_data[err_data == float('inf')] = 0
err_interpol = interp1d(x_data, err_data)
err_uniform = err_interpol(x_uniform)
err_uniform[err_uniform == 0.] = float('inf')
# interpolation for 2D
elif var_dimension == 2:
# note: the interp2d interpolation function requires input of the form
# interp2d(x, y, z)
# where if np.size(x)=m and np.size(y)=n then np.shape(z)=(n,m)
# E.g. if x = [0,1,2]; y = [0,3]; z = [[1,2,3], [4,5,6]]
# Because Observable.dependent_variables_structure gives the order in which
# the independent variables are represented in the np.shape of the data,
# we have to reverse the order of the x and y arrays for interp2d:
x_data = observable.independent_variables[var_indexing[var_key][1]]
y_data = observable.independent_variables[var_indexing[var_key][0]]
data_interpol = interp2d(x_data, y_data, data)
# get the independent_variables that satisfy the uniformity requirements
# as created earlier
x_uniform = indep_var_uniform[var_indexing[var_key][1]]
y_uniform = indep_var_uniform[var_indexing[var_key][0]]
uniform_data = data_interpol(x_uniform, y_uniform)
# repeat the interpolation for the errors
err_data = observable.errors[var_key][0]
err_data[err_data == float('inf')] = 0
err_interpol = interp2d(x_data, y_data, err_data)
err_uniform = err_interpol(x_uniform, y_uniform)
err_uniform[err_uniform == 0.] = float('inf')
else:
raise NotImplementedError(
'Only 1D and 2D data can currently be made uniform')
# save the uniform data and errors back into the Observable
observable.dependent_variables[var_key] = [uniform_data]
observable.errors[var_key] = [err_uniform]
# finally, set the independent variables of the ``Observable`` to the uniform ones
observable.independent_variables = indep_var_uniform
return observable
def _validate_energy(self, obs: Observable) -> None:
"""
Try and validate the energy of the ``Observable`` provided, and pass if
it does not have a ``validate_energy`` function itself
Parameters
----------
obs : Observable
``Observable`` to validate
Returns
-------
None
"""
# Calculate the time separation between trajectory frames, dt, imposed
# by the simulation
dt = self.simulation.traj_step * self.simulation.time_step
with suppress(AttributeError):
obs.validate_energy(dt)