"""A module for performing the refinement"""
import logging
import statistics
from contextlib import suppress
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Union
import numpy as np
import pandas as pd
from scipy.interpolate import RectBivariateSpline, interp1d
from verbosemanager import VerboseManager
from MDMC.common.decorators import repr_decorator
from MDMC.control.plot_results import PlotResults, data_printers
from MDMC.MD.engine_facades.facade import MDEngineError
from MDMC.MD.parameters import Parameters
from MDMC.MD.simulation import Simulation
from MDMC.refinement.FoM.FoM_abs import ObservablePair
from MDMC.refinement.FoM.FoM_factory import FoMFactory
from MDMC.refinement.minimizers.minimizer_factory import MinimizerFactory
from MDMC.resolution.resolution_factory import ResolutionFactory
from MDMC.trajectory_analysis.observables.obs import Observable
from MDMC.trajectory_analysis.observables.obs_factory import ObservableFactory
[docs]
@repr_decorator('simulation', 'exp_datasets', 'FoM_calculator', 'minimizer',
'reset_config', 'fit_parameters', 'MD_steps',
'verbose')
class Control:
"""
Controls the MDMC refinement
Parameters
----------
simulation : Simulation
Performs a simulation for a given set of potential ``Parameter``
objects.
exp_datasets : list of dicts
Each `dict` represents an experimental dataset, containing the
following keys:
- ``file_name`` (`str`) the file name
- ``type`` (`str`) the type of observable
- ``reader`` (`str`) the reader required for the file
- ``weighting`` (`float`) the weighting of the dataset to be used in
the Figure of Merit calculation
- ``resolution`` : (`dict`, but can be `str`, `float`, or None)
Should be a one-line dict of format {'file' : str} or {key : float}
if {'file' : str}, the str should be a file in
the same format as ``file_name`` containing results of a vanadium
sample which is used to determine instrument energy resolution for
this dataset.
If {key : float}, the key should be the type of resolution function.
allowed types are 'gaussian' and 'lorentzian'
Can also be 'lazily' given as just a `str` or `float`.
If just a string is given, it is assumed to be a filename.
If just a float is given, it is assumed to be Gaussian.
The float should be the instrument energy resolution as the FWHM in ``ueV`` (micro eV).
If you have already accounted for instrument resolution in your dataset,
this field can be set to None, and resolution application will be skipped.
This must be done explicitly.
- ``rescale_factor`` (`float`, optional, defaults to `1.`) applied to
the experimental data when calculating the FoM to ensure it is on
the same scale as the calculated observable
- ``auto_scale`` (`bool`, optional, defaults to `False`) set the
``rescale_factor`` automatically to minimise the FoM, if both
``rescale_factor`` and ``auto_scale`` are provided then a warning
is printed and ``auto_scale`` takes precedence
- ``use_FFT`` (`bool`, optional, defaults to `True`) whether to use
Fast Fourier Transforms in the calculation of dependent variables.
FFT speeds up calculation but places restrictions on spacing in the
independent variable domain(s). This option may not be supported
for all ``Observable``s
Note that the default (and preferred) behaviour of the scaling settings requires that the
dataset provided has been properly scaled and normalised for the refinement process.
Arbitrary or automatic rescaling should be undertaken with care, as it does not take into
account any physical aspects of scaling the data, such as the presence or absence of
background events from peaks outside its range.
fit_parameters : Parameters, list of Parameter
All parameters which will be refined. Note that any ``Parameter`` that is ``fixed``,
``tied`` or equal to 0 will not be passed to the minimizer as these cannot be refined.
Those with ``constraints`` set are still passed.
minimizer_type : str, optional
The ``Minimizer`` type. Default is 'MMC'.
FoM_options : dict of {str : str}, optional
Defines the details of the ``FigureOfMeritCalculator`` to use. Default is `None`, in which
case the first option for each of the following keys is used:
- ``errors`` {'exp', 'none'} Whether to weight the differences between MD and
experimental data with the experimental error or not (errors from MD are not currently
supported).
- ``norm`` {'data_points', 'dof', 'none'} How to normalise the FoM, either by the total
number of data_points (n), the degrees of freedom (n - m, where m is the number of
parameters being varied), or not at all.
reset_config : bool, optional
Determines if the configuration is reset to the end of the last accepted
state. Default is `True`.
MD_steps : int, optional
Number of molecular dynamics steps for each step of the refinement.
When not provided, the minimum number of steps needed for successful
calculation of the observables is used. If provided, the actual number
of steps may be reduced to prevent running MD that won't be used when
calculating dependent variables. Default is `None`.
equilibration_steps : int, optional
Number of molecular dynamics steps used to equilibrate the ``Universe``
in between each refinement step. When changes to the ``Parameters`` are small,
this equilibration can be much shorter than the equilibration needed before
starting the refinement process, but in general will vary depending on
the details of the ``Universe`` and ``Parameters``. Default is 0.
verbose: int, optional
The level of verbosity:
Verbose level -1 hides all outputs for tests.
Verbose level 0 gives no information.
Verbose level 1 gives final time for the whole method.
Verbose level 2 gives final time and also a progress bar.
Verbose level 3 gives final time, a progress bar, and time per step.
print_full_settings: bool, optional
Whether to print all settings/attributes/parameters passed to this object,
defaults to False.
**settings: dict, optional
n_steps: int
The maximum number of refinement steps to be. An optional parameter that,
if specified, will be used in the ``refine`` method unless a different value
of ``n_steps`` is explicitly passed when running ``refine``.
cont_slicing : bool, optional
Flag to decide between two possible behaviours when the number of ``MD_steps`` is
larger than the minimum required to calculate the observables. If ``False`` (default)
then the ``CompactTrajectory`` is sliced into non-overlapping sub-``CompactTrajectory``
blocks for each of which the observable is calculated. If ``True``, then the
``CompactTrajectory`` is sliced into as many non-identical sub-``CompactTrajectory``
blocks as possible (with overlap allowed).
results_filename: str
The name of the file in which the results of the MDMC run will be stored
MC_norm: int
1 if the MMC minimiser is to be used for parameter optimisation.
use_average : bool, optional
Optional parameter relevant in case ``MD_steps`` is larger than the minimum required
and the MD ``CompactTrajectory`` is sliced into sub-``CompactTrajectory`` blocks.
If ``True`` (default) the observables are averaged over the sub-``CompactTrajectory``
blocks. If ``False`` they are not averaged.
data_printer: str, default 'plaintext'
How to display the data during minimisation. Current options are 'plaintext' (default,
plaintext printing) or 'ipython' (prettier HTML printing via iPython)
time_step_reductions : int, optional
The number of attempts to reduce the time_step of the simulation in order to fix an
equilibration that keeps failing. Defaults to a value of 2.
Example
-------
An example of an exp_dataset list is::
[{'file_name':data.LAMP_SQW_FILE,
'type':'SQw',
'reader':'LAMPSQw',
'weight':1.,
'resolution':{'file':data.LAMP_SQW_VAN_FILE}
'rescale_factor':0.5},
{'file_name:data.ANOTHER_FILE',
'type':'FQt',
'reader':'GENERIC_READER',
'weight':0.5,
'resolution':{'gaussian':2.35}
'auto_scale':True}]
Attributes
----------
simulation : Simulation
The ``Simulation`` on which is used to perform the refinement
exp_datasets : `list of dicts`
One `dict` per experimental dataset used for the refinement
fit_parameters : Parameters
All ``Parameter`` objects which will be refined
minimizer : Minimizer
Refines the potential parameters.
observable_pairs : `list` of ``ObservablePairs``
Experimental observable/MD observable pairs which are used to calculate
the Figure of Merit
FoM_calculator : FigureOfMeritCalculator
Calculates the FoM `float` from the ``observable_pairs``.
MD_steps : `int`
Number of molecular dynamics steps for each step of the refinement
"""
def __init__(self, simulation: Simulation, exp_datasets: List[dict],
fit_parameters: Parameters,
minimizer_type: str = 'MMC', FoM_options: dict = None,
reset_config: bool = True, MD_steps: int = None,
equilibration_steps: int = 0,
previous_history: Union[str,Path] = None,
verbose: int = 0,
print_all_settings: bool = False,
**settings: dict):
self.previous_history = previous_history
self.step_timings: list = []
self.simulation = simulation
self.exp_datasets = exp_datasets
self.verbose = verbose
self.timings: dict = {'equilibrate': [],
'_run_MD': [],
'convert_trajectory': [],
'FoM_calculator': [],
'minimizer': [],
'TOTAL STEP': []}
# Remove any fixed, tied or parameters equal to 0 as these cannot be refined
# if a Parameters object, convert to list first for comprehension
if isinstance(fit_parameters, Parameters):
fit_parameters = list(fit_parameters.values())
fit_parameters = [p for p in fit_parameters
if (not (p.fixed or p.tied) and p.value != 0)]
self.fit_parameters = Parameters(fit_parameters)
self.reset_config = reset_config
self.equilibration_steps = equilibration_steps
self.settings = settings
self.n_steps = settings.get('n_steps')
self.results_filename = settings.get('results_filename',
f'results_{datetime.now().strftime("%Y-%m-%d--%H-%M-%S")}.csv')
settings['results_filename'] = self.results_filename
settings['time_step_reductions'] = settings.get('time_step_reductions', 2)
# Minimizer FoM_old is always initialised to infinity, so that first MC
# step (i.e. the setup) is always accepted.
# pylint: disable=line-too-long
# disable this pylint warning as this can't be fixed in a way that looks good
self.minimizer = MinimizerFactory.create_minimizer(minimizer_type, self,self.fit_parameters,
previous_history, **settings)
self.first_loaded_step = bool(previous_history)
# Create experimental observables from datasets and placeholders for
# experimental observables calculated from MD
self.observable_pairs = []
minimum_MD_steps = 0
for dset in exp_datasets:
try:
use_FFT = dset['use_FFT']
except KeyError:
use_FFT = True
# keep the keys in the _dset_input_check function
# consistent with the ones retrieved from dset
self._input_check(dset, inputs = ['type','reader', 'file_name'])
exp_observable = self._read_observable_from_file(dset['type'],
dset['reader'],
dset['file_name'],
use_FFT)
if exp_observable.uniformity_requirements:
exp_observable = self._make_data_uniform(exp_observable)
MD_observable = self._create_empty_observable(exp_observable, exp_observable.use_FFT)
self._validate_energy(MD_observable)
auto_scale = dset.get('auto_scale', False)
rescale_factor = dset.get('rescale_factor')
if auto_scale and rescale_factor and self.verbose != -1:
print('Both `rescale_factor` and `auto_scale` set for file {};'
' scaling will be automated to minimise FoM'
''.format(dset['file_name']))
rescale_factor = 1.
elif not rescale_factor:
rescale_factor = 1.
observable_pair = ObservablePair(exp_observable,
MD_observable,
dset['weight'],
rescale_factor=rescale_factor,
auto_scale=auto_scale)
self.observable_pairs.append(observable_pair)
self.recreated_independent_vars = {}
self.production_time_step = self.simulation.time_step
# Take the largest minimum number of MD_steps needed by any dataset
min_MD_steps_dset = self._calculate_minimum_MD_steps(
observable_pair)
minimum_MD_steps = max(minimum_MD_steps, min_MD_steps_dset)
if FoM_options is None or FoM_options.get('error') is None:
FoM_error = 'exp'
else:
FoM_error = FoM_options.get('error')
if FoM_options is None or FoM_options.get('norm') is None:
FoM_norm = 'data_points'
else:
FoM_norm = FoM_options.get('norm')
self.FoM_calculator = FoMFactory.create_FoM(FoM_error, self.observable_pairs,
norm=FoM_norm,
n_parameters=len(self.fit_parameters))
self.max_FoM = self.calculate_max_FoM()
# Use specified MD_steps if supplied, else calculate
# cont_slicing produces small sub-trajectories, so calculation is unnecessary
if MD_steps:
try:
assert MD_steps >= minimum_MD_steps
if self.settings.get('cont_slicing', False):
self.MD_steps = MD_steps
else:
# Set self.MD_steps to be the largest number required by any of
# our observable pairs
maximum_MD_steps = minimum_MD_steps
for pair in self.observable_pairs:
max_MD_steps_pair = self._calculate_maximum_MD_steps(
MD_steps, pair)
maximum_MD_steps = max(maximum_MD_steps, max_MD_steps_pair)
self.MD_steps = maximum_MD_steps
except AssertionError as error:
raise ValueError('Experimental datasets provided require a '
f'minimum MD_steps value of {minimum_MD_steps} in order to '
'calculate observables') from error
else:
self.MD_steps = minimum_MD_steps
# read in resolutions for experimental datasets
for i, dset in enumerate(exp_datasets):
resolution_factory = ResolutionFactory()
dt = self.simulation.time_step * self.simulation.traj_step
if 'resolution' not in dset:
# create list of user keys for resolutions to add to the error
userkeys = []
for setting in resolution_factory.resolutions:
userkeys.append(setting.lower().replace('resolution', ''))
raise KeyError("A resolution function must be added. Recognised functions are " +
str(userkeys) +
". If you meant to apply no resolution,"
" then specify resolution as None for the exp_dataset parameters.")
resolution = resolution_factory.create_instance(dset['resolution'], dset['type'],
dset['reader'], dt)
self.observable_pairs[i].exp_obs.resolution = resolution
self.observable_pairs[i].MD_obs.resolution = resolution
# setup the dataframe for stdout
data_array = [
["-"],
[minimizer_type],
[self.FoM_calculator.__class__.__name__],
[len(self.observable_pairs)],
[len(self.fit_parameters)],
]
index_array = [
'- Attributes',
' Minimizer',
' FoM type',
' Number of observables',
' Number of parameters',
]
# Printing settings
if print_all_settings:
for item in ["MD_steps", "equilibration_steps", "reset_config", "verbose"]:
index_array.append(f' {item}')
data_array.append([self.__dict__[item]])
# pylint: disable=consider-using-dict-items
index_array.append("- Control Settings")
data_array.append(["-"])
for setting in settings:
index_array.append(f' {setting}')
data_array.append([settings[setting]])
index_array.append("- Parameters")
data_array.append(["-"])
for parameter in fit_parameters:
index_array.append(f' {parameter.name}')
data_array.append([parameter.value])
index_array.append("- Experimental Datasets")
data_array.append(["-"])
if exp_datasets is not None:
for dataset in exp_datasets:
for key in dataset:
index_array.append(f' {key}')
data_array.append([dataset[key]])
index_array.append("- FoM Options")
data_array.append(["-"])
if FoM_options is not None:
for key in FoM_options:
index_array.append(f' {key}')
data_array.append([FoM_options[key]])
setup_frame = pd.DataFrame(data=data_array,
index=index_array)
if self.verbose != -1:
print(f'Control created with:\n{setup_frame.to_string(index=True, header=False)}\n')
# set up data printer and attach minimiser history to it
self.data_printer = data_printers[settings.get('data_printer', 'plaintext')]()
def __str__(self) -> str:
exp_dataset_types = [dataset['type'] for dataset in self.exp_datasets]
# plural adds "s" to end of "parameter" if there is more than one parameter
plural = ("" if len(self.fit_parameters) == 1 else "s")
return (f"{self.__class__.__name__} refining {len(self.fit_parameters)} parameter{plural} "
f"using {exp_dataset_types} data types")
[docs]
def refine(self, n_steps: int = None) -> None:
"""
Refines the specified potential parameters
Parameters
----------
n_steps : int
Maximum number of steps for the refinement. Must be specified either when creating
the ``Control`` object or when calling this method. The value specified to this
method supersedes the value passed (if any) when the ``Control`` object was created.
If nothing is passed, the method will check if a number was specified when the
``Control`` object was created and use that value.
Examples
--------
Perform a refinement with a maximum of 100 steps:
.. highlight:: python
.. code-block:: python
control.refine(100)
"""
if not n_steps:
if self.n_steps is None:
raise ValueError('The number of maximum refinement steps needs to be'
' specified and greater than 0.')
else:
self.n_steps = n_steps
# calculate verbose steps
# 4 steps per refinement step, and n + 1 steps total
verbose_steps = (self.n_steps + 1) * 4
# initialise step timings list for average step timings at end
self.step_timings = []
count = -1
if self.verbose != -1:
self.data_printer.print_header(self.minimizer.history) # creates stdout header
verbose_manager = VerboseManager.instance()
verbose_manager.start(verbose_steps, verbose=self.verbose)
while count < n_steps and not self.minimizer.has_converged():
bad_param_location = False
if count >= 0:
try:
self.equilibrate(self.equilibration_steps)
except MDEngineError:
bad_param_location = True # assuming params are bad and moving on
self.reset_engine()
verbose_manager.header(f"Step {count + 1}")
# advance the refinement by one step
self.step(bad_param_location=bad_param_location)
count += 1
if self.verbose == 3: # if progress bar is there, ensure data is on new line
print("")
self.data_printer.print_data(self.minimizer.history)
else:
while count < n_steps and not self.minimizer.has_converged():
bad_param_location = False
if count >= 0:
try:
self.equilibrate(self.equilibration_steps)
except MDEngineError:
bad_param_location = True # assuming params are bad and moving on
self.reset_engine()
# advance the refinement by one step
self.step(bad_param_location=bad_param_location)
count += 1
# Try/except accounts for n_steps <= -1
try:
# Reset the minimizer parameters to those from the final FoM:
# to account for a current side effect of step()
self.minimizer.reset_parameters()
self._update_engine_parameters()
except TypeError:
pass
# print values of final parameters
result_string = self.minimizer.present_result()
if self.verbose != -1:
print(result_string)
# If automatically scaling data print the scale factor for each dataset
scaling_keys = []
scaling_values = []
for i, observable_pair in enumerate(self.observable_pairs):
if observable_pair.auto_scale:
dset = self.exp_datasets[i]
scaling_keys.append(' {}'.format(dset['file_name']))
scaling_values.append([observable_pair.rescale_factor])
if len(scaling_keys) > 0 and len(scaling_values) > 0:
scaling_df = pd.DataFrame(scaling_values, index=scaling_keys)
if self.verbose != -1:
print(
f'\nAutomatic Scale Factors\n{scaling_df.to_string(index=True, header=False)}')
if self.verbose >= 1:
average_timing = statistics.mean(self.step_timings)
if self.verbose != -1:
print(
f'\nAverage time per step was {np.round(average_timing, 2)} seconds.')
verbose_manager.finish("Refinement")
[docs]
def minimize(self, n_steps: int,
minimize_every: int = 10,
verbose: bool = False, output_log: str = None,
work_dir: str = None, **settings: dict) -> None:
"""
Performs an MD run intertwined with periodic structure relaxation.
This way after a local minimum is found, the system is taken
out of the minimum to explore a larger volume of the parameter
space.
Parameters
----------
n_steps : int
Total number of the MD run steps
minimize_every: int, optional
Number of MD steps between two consecutive minimizations
verbose: bool, optional
Whether to print statements when the minimization has been started and completed
(including the number of minimization steps and time taken). Default is `False`.
output_log: str, optional
Log file for the MD engine to write to. Default is `None`.
work_dir: str, optional
Working directory for the MD engine to write to. Default is `None`.
**settings
``etol`` (`float`)
If the energy change between iterations is less than ``etol``,
minimization is stopped. Default depends on engine used.
``ftol`` (`float`)
If the magnitude of the global force is less than ``ftol``,
minimization is stopped. Default depends on engine used.
``maxiter`` (`int`)
Maximum number of iterations of a single structure
relaxation procedure. Default depends on engine used.
``maxeval`` (`int`)
Maximum number of force evaluations to perform. Default depends
on engine used.
"""
try:
self.simulation.minimize(n_steps,minimize_every,verbose,
output_log,work_dir, **settings)
except Exception as exc:
raise Exception('Minimization failed, please check the parameter values.') from exc
[docs]
def equilibrate(self, n_steps: int = None, verbose: bool = False,
output_log: str = None, work_dir: str = None, **settings: dict) -> None:
"""
Run molecular dynamics to equilibrate the ``Universe``.
If the equilibration fails, a method to reduce the time_step and re-try, is called.
If this re-try also fails, then an MDEngineError is raised.
Parameters
----------
n_steps : int
Number of simulation steps to run.
verbose: bool, optional
Whether to print statements upon starting and completing the run.
Default is `False`.
output_log: str, optional
Log file for the MD engine to write to. Default is `None`.
work_dir: str, optional
Working directory for the MD engine to write to. Default is `None`.
"""
try:
if not n_steps:
self.simulation.auto_equilibrate()
else:
self.simulation.run(n_steps, True, verbose, output_log, work_dir,**settings)
except MDEngineError:
try:
self.engine_recovery_from_equil(n_steps=n_steps,verbose=verbose,
output_log=output_log,work_dir=work_dir,**settings)
except MDEngineError as exc:
logging.exception('The MD engine produced an error. This is often due to '
'bad constraints or parameter values - please check these and try again.')
raise MDEngineError from exc
[docs]
def step(self, bad_param_location: bool = False) -> None:
"""
Do a full step: generate and run MD to calculate FoM for existing
parameters, iterate parameters a step forward and reset MD (phasespace)
if previous step was rejected and reset_config = true
Parameters
---------
bad_param_location : bool
Represents whether the equilibration at these parameter value failed or not.
If equilibration failed, value is True.
"""
verbose_manager = VerboseManager.instance()
verbose_manager.start(4, verbose=self.verbose)
if self.first_loaded_step:
fom = self.minimizer.FoM_old
self.first_loaded_step = False
elif not bad_param_location:
# Generate FoM by running MD for this step and then calculate FoM
fom = self._generate_FoM()
else:
# assuming params are bad so use max FoM available
fom = self.max_FoM
verbose_manager.step("Selecting new parameters and updating engine")
# Select new parameters to consider
self.minimizer.step(fom)
# Update the MD engine with new parameters
self._update_engine_parameters()
# When reset_config=true reset the MD (phasespace) back if the
# previous step was rejected
if self.reset_config:
if self.minimizer.state_changed:
# Set MD engine to remember new config
self.simulation.engine.save_config()
else:
# Set MD engine to reset to old config
self.simulation.engine.reset_config()
self.minimizer.write_history(self.results_filename)
step_timings = verbose_manager.finish("Refinement step")
self.step_timings.append(step_timings)
[docs]
def plot_results(self, filename: str=None, points: int=100000, MH_norm: float=20.0) -> None:
"""
Instantiates an insstance of the PlotResults class and generates a cornerplot
from the data in self.results_filename
Parameters
----------
filename : str, optional
The filename and path to the history file. Defaults to None which
then uses self.results_filename
points : int, optional
The number of samples to initially generate, defaults to 100,000
MH_norm : float, optional
The denominator of the exponent, controlling how likley points are to be kept,
defaults to 20.0
Returns
-------
corner plot : Matplotlib.figure.Figure
A plot displaying every parameter combination with their variances and covariances
"""
if filename is None:
filename = self.results_filename
plotter = PlotResults(filename, MH_norm=MH_norm, points=points,
quantiles=[0.34, 0.5, 0.68])
cornerplot, means, stds = plotter.create_cornerplot()
if self.verbose != -1:
print(f'Parameter means = {means}, Parameter errors = {stds}')
return cornerplot
def _generate_FoM(self) -> float:
"""
Run the MD for an iteration/step, calculate observable, compare with
observed and return the FoM
Returns
-------
`float`
Non-negative `float` FoM
"""
try:
self._run_MD()
self._calculate_observables(self.simulation, self.observable_pairs)
self._trim_dependent_variables()
FoM_value = self.FoM_calculator.calculate()
except MDEngineError:
FoM_value = self.max_FoM
return FoM_value
def _run_MD(self) -> None:
"""
Run a molecular dynamics simulation
"""
self.simulation.run(self.MD_steps, verbose=False)
def _update_engine_parameters(self) -> None:
"""
Update the force field parameters of the MD engine
"""
self.simulation.engine.update_parameters()
@staticmethod
def _read_observable_from_file(obstype: str, reader: str, file_name: str,
use_FFT: bool = True) -> Observable:
"""
Creates an Observable of the specified type and reads in data from file
Parameters
----------
obstype : str
The ``type`` of the ``Observable``.
reader : str
The ``type`` of the ``Reader``.
file_name : str
The absolute or relative path and the file name.
use_FFT: bool, optional
boolean determining if the FFT should be used, default is True
Returns
-------
``Observable``
An ``Observable`` of specified ``type``
"""
observable = ObservableFactory.create_observable(obstype)
observable.read_from_file(reader=reader, file_name=file_name)
observable.use_FFT = use_FFT
return observable
@staticmethod
def _create_empty_observable(exp_observable: Observable, use_FFT: bool = True) -> Observable:
"""
Creates a ``Observable`` without data but with independent variables
specified from another ``Observable``. This is a placeholder in which
the ``Observable`` can be calculated from an MD trajectory.
Parameters
----------
exp_observable : Observable
An ``Observable`` with defined independent variables.
use_FFT: bool, optional
boolean determining if the FFT should be used, default is True
Returns
-------
``Observable``
An ``Observable`` with only independent variables and
``origin == 'MD'``
"""
observable = ObservableFactory.create_observable(exp_observable.name)
observable.origin = 'MD'
observable.independent_variables = deepcopy(
exp_observable.independent_variables)
observable.use_FFT = use_FFT
return observable
def _calculate_observables(self,
simulation: Simulation,
observable_pairs: 'list[ObservablePair]') -> None:
"""
Calculates all of the ``Observable`` objects from the MD
trajectory/configurations
Parameters
----------
simulation : Simulation
``MDEngine`` with defined trajectory attribute
observable_pairs : list of ObservablePairs
``ObservablesPairs`` for which the MD ``Observable`` will be
calculated
"""
verbose_manager = VerboseManager.instance()
# this is a subprocess, so we don't bother giving maximum steps since it won't be used
verbose_manager.start(0, verbose=self.verbose)
verbose_manager.step("Converting trajectory")
trj = simulation.engine.convert_trajectory()
verbose_manager.step("Calculating observables from the MD trajectory")
for pair in observable_pairs:
obs_timings = pair.MD_obs.calculate_from_MD(trj, verbose=self.verbose, **self.settings)
if pair.MD_obs.name =='SQw':
self.recreated_independent_vars['SQw'] = pair.MD_obs.recreated_Q
if self.verbose == 1 and obs_timings is not None:
for key, value in obs_timings.items():
if key not in self.timings:
self.timings[key] = []
self.timings[key] += value
verbose_manager.finish("Calculating observables")
def _calculate_minimum_MD_steps(self, observable_pair: ObservablePair) -> int:
"""
Calculates the minimum number of steps required for the MD engine in
order to calculate MD ``Observables`` objects with the same independent
variables as the experimental ``Observable`` objects.
Parameters
----------
observable_pair : ObservablePair
``ObservablesPair`` for which the required number of ``MD_steps``
is calculated
Returns
-------
`int`
Number of ``MD_steps``
"""
time_step = self.simulation.time_step
traj_step = self.simulation.traj_step
# Calculate the time separation between trajectory frames, dt
dt = time_step * traj_step
# Calculate the minimum number of trajectory frames needed for the
# calculation of the dependent_variables of observable_pair
minimum_frames = observable_pair.exp_obs.minimum_frames(dt)
return traj_step * minimum_frames
def _calculate_maximum_MD_steps(self, MD_steps: int,
observable_pair: ObservablePair) -> int:
"""
Calculates the maximum number of steps that ``observable_pair`` will be
able to use when calculating dependent variables whilst still being
below the ``MD_steps`` specified by the user. Any additional steps
beyond this would not contribute to the calculation.
If ``observable_pair.exp_obs.maximum_frames()`` is None, then all frames can be used so we
just return the largest multiple of ``traj_steps``.
Otherwise, we calculate the largest multiple of
``traj_step * observable_pair.exp_obs.maximum_frames()``, as we can then calculate the
dependent variable multiple times by taking subsets of the total trajectory.
Parameters
----------
MD_steps : int
The hard upper limit on the number of steps to run, as specified by
the user
observable_pair : ObservablePair
``ObservablesPair`` for which the required number of ``MD_steps``
is calculated
Returns
-------
int
Maximum number of ``MD_steps`` needed
"""
traj_step = self.simulation.traj_step
maximum_frames = observable_pair.exp_obs.maximum_frames()
maximum_steps = traj_step if maximum_frames is None else traj_step * maximum_frames
return maximum_steps * (MD_steps // (maximum_steps))
@staticmethod
def _is_data_uniform(observable: Observable) -> Dict[str, Dict[str, bool]]:
"""
Checks if the values for each independent variable of an ``Observable`` are uniformly
spaced and if they start at zero. This information is returned in a single dictionary.
Parameters
----------
observable : Observable
The ``Observable`` for which to check the independent variables.
Returns
-------
`Dict[str, Dict[str, bool]]`
An outer dictionary where the independent variables of the ``Observable`` are the keys,
and the values are another dictionary corresponding to that variable. This inner
dictionary has the same format for all variables, with the two keys 'uniform' and
'zeroed'. The values for these keys are booleans that state whether the data fulfils
the corresponding requirement.
Examples
--------
>>> _is_data_uniform(observable)
{'E': {'uniform': True, 'zeroed': True}, 'Q': {'uniform': True, 'zeroed': False}}
"""
uniformity_dict = {}
for var_key, var_data in observable.independent_variables.items():
uniform_data = np.linspace(
min(var_data), max(var_data), num=len(var_data))
is_uniform = np.allclose(var_data, uniform_data, rtol=1e-5)
uniformity_dict[var_key] = {
'uniform': is_uniform, 'zeroed': var_data[0] == 0}
return uniformity_dict
def _make_data_uniform(self, observable: Observable) -> Observable:
"""
Takes an ``Observable``, checks the requirements for its ``independent_variables``
to be uniform or start at zero, creates uniform grids for the variables that do not
satisfy their requirement, interpolates the ``dependent_variables`` as needed,
and returns an ``Observable`` with the uniform/interpolated variables.
Limited to ``Observables`` with two-dimensional ``dependent_variables`` (e.g. SQw).
This may change in future.
Parameters
----------
observable : Observable
An ``Observable`` for which the independent variables
need to be made uniform / to start at zero. Currently
limited to ``Observables`` for which the ``dependent_variables`` are two-dimensional.
Returns
-------
``Observable``
Returns a copy of the passed ``Observable`` with the independent variables put onto
uniform grid (for the variables where that is necessary)
and the dependent variables interpolated onto the same grid
"""
# get the uniformity requirements from the Observable
uniformity_required = observable.uniformity_requirements
if uniformity_required is None:
return observable
# determine for all independent_variables if they are currently uniform or start at zero
uniformity_state = self._is_data_uniform(observable)
# initialise helper list for the independent_variables that need to be made uniform
indep_vars_to_be_changed = []
# loop through requirements
for var_key, var_required in uniformity_required.items():
var_state = uniformity_state[var_key]
# if the variable has a requirement AND it is not satisfied
# (for either uniformity OR zero-start)
# then add it to the list of variables that need to be changed
if (var_required['uniform'] and not var_state['uniform']) or \
(var_required['zeroed'] and not var_state['zeroed']):
indep_vars_to_be_changed.append(var_key)
# if all uniformity requirements are already satisfied
# simply return the original observable
if not indep_vars_to_be_changed:
return observable
# initialise a helper dictionary to hold the new independent variables
indep_var_uniform = {}
# loop through all independent variables
for var_key in uniformity_state:
# check if the independent variable needs to be made uniform
if var_key in indep_vars_to_be_changed:
data = observable.independent_variables[var_key]
minimum = 0 if uniformity_required[var_key]['zeroed'] else min(data)
uniform_data = np.linspace(minimum, max(data), num=len(data))
indep_var_uniform[var_key] = uniform_data
# if uniformity requirements are satisfied already,
# add the data points to the helper dictionary
else:
indep_var_uniform[var_key] = observable.independent_variables[var_key]
# get the indexing order of independent variables within the dependent variables
var_indexing = observable.dependent_variables_structure
# loop through the dependent variables and interpolate them
for var_key, data_list in observable.dependent_variables.items():
# Experimental Observables should only have 1 element in data_list
try:
assert len(data_list) == 1
data = data_list[0]
except AssertionError as error:
raise AssertionError('Expected experimental dataset to only have one dependent '
f'variable entry for {var_key}, '
f'but found {len(data_list)} instead') from error
# determine the dimension of the dependent variable
var_dimension = data.ndim
# interpolation for 1D
if var_dimension == 1:
x_data = observable.independent_variables[var_indexing[var_key][0]]
data_interpol = interp1d(x_data, data)
x_uniform = indep_var_uniform[var_indexing[var_key][0]]
uniform_data = data_interpol(x_uniform)
# repeat the interpolation for the errors
err_data = observable.errors[var_key][0]
err_data[err_data == float('inf')] = 0
err_interpol = interp1d(x_data, err_data)
err_uniform = err_interpol(x_uniform)
err_uniform[err_uniform == 0.] = float('inf')
# interpolation for 2D
elif var_dimension == 2:
# Because Observable.dependent_variables_structure gives the order in which
# the independent variables are represented in the np.shape of the data,
# we have to reverse the order of the x and y arrays for interpolation:
x_data = observable.independent_variables[var_indexing[var_key][1]]
y_data = observable.independent_variables[var_indexing[var_key][0]]
data_interpol = RectBivariateSpline(x_data, y_data, data.T)
# get the independent_variables that satisfy the uniformity requirements
# as created earlier
x_uniform = indep_var_uniform[var_indexing[var_key][1]]
y_uniform = indep_var_uniform[var_indexing[var_key][0]]
uniform_data = data_interpol(x_uniform, y_uniform).T
# repeat the interpolation for the errors
err_data = observable.errors[var_key][0]
err_data[err_data == float('inf')] = 0
err_interpol = RectBivariateSpline(x_data, y_data, err_data.T)
err_uniform = err_interpol(x_uniform, y_uniform).T
err_uniform[err_uniform == 0.] = float('inf')
else:
raise NotImplementedError(
'Only 1D and 2D data can currently be made uniform')
# save the uniform data and errors back into the Observable
observable.dependent_variables[var_key] = [uniform_data]
observable.errors[var_key] = [err_uniform]
# finally, set the independent variables of the ``Observable`` to the uniform ones
observable.independent_variables = indep_var_uniform
return observable
def _validate_energy(self, obs: Observable) -> None:
"""
Try and validate the energy of the ``Observable`` provided, and pass if
it does not have a ``validate_energy`` function itself. The time step and trajectory step
may be changed in the validate_energy method of the specific observable if necessary, to
achieve an energy separation that matches that of the experimental data.
Parameters
----------
obs : Observable
``Observable`` to validate
Returns
-------
None
"""
with suppress(AttributeError):
changed, traj_step, time_step, self.dt_required \
= obs.validate_energy(self.simulation.time_step)
if changed:
self.simulation.traj_step = traj_step
self.simulation.time_step = time_step
logging.warning(" The given traj_step and time_step values were not"
" compatibile with the dataset specified.\nThe values "
"(whilst prioritising time_step) have been changed to"
" traj_step: %d, and time_step: %f. \n"
"Context: for this dataset, traj_step multiplied by time_step"
" must be ~= %f (6 d.p). \n" , traj_step,time_step,self.dt_required)
def _input_check(self, general_set, inputs) -> None:
"""
Handles error for retrieving data from a set where the input is not found.
This was made in a general way to be used for any dataset or set of inputs.
Parameters
----------
general_set : A general dataset
A variable that contains a set of information for input checking against,
for example 'dset' contains the observable type, file_name and reader type,
for checking inputs.
Returns
-------
None
"""
for input_check in inputs:
try:
general_set[input_check]
except KeyError as error:
raise KeyError("There was an issue retrieving the input: "
f" {input_check} "
"from the dataset, please check your inputs again.") from error
def _trim_dependent_variables(self) -> None:
"""
Trims the dependent variable data, and the associated errors, for observables
where necessary. One such application is when the smallest q_values cannot be recreated by
the simulation, this trimming removes the data associated with q_values which cannot be
recreated.
Returns
-------
None
"""
# loop through observable pairs and trim dependent var data accordingly
for index, pair in enumerate(self.observable_pairs):
obs = pair.exp_obs.name
if obs in self.recreated_independent_vars:
exp_obs = self.observable_pairs[index].exp_obs
md_obs = self.observable_pairs[index].MD_obs
recreated_independent_vars = self.recreated_independent_vars[obs]
if len(exp_obs.dependent_variables[obs][0]) != \
len(md_obs.dependent_variables[obs][0]):
exp_obs.errors[obs][0] = \
[exp_obs.errors[obs][0][num] for num in recreated_independent_vars]
exp_obs.dependent_variables[obs][0] = \
[exp_obs.dependent_variables[obs][0][num] for num in recreated_independent_vars]
[docs]
def engine_recovery_from_equil(self, n_steps: int, verbose: bool,
output_log: str, work_dir: str, **settings: dict) -> None:
"""
Handles an MDEngineError thrown by the MD engine. Currently this error is only raised by
LAMMPS.
The time_step is reduced, and the engine cleared, and then an equilibration is performed.
If the equilibration is unsuccessful when the attempt limit is reached, an MDEngineError is
raised.
Parameters
----------
n_steps : int
Number of simulation steps to run.
verbose: bool, optional
Whether to print statements upon starting and completing the run.
output_log: str, optional
Log file for the MD engine to write to.
work_dir: str, optional
Working directory for the MD engine to write to.
Returns
-------
None
"""
counter = 0
equil_works = False
attempt_limit = self.settings['time_step_reductions']
while not equil_works and counter < attempt_limit:
self.trial_reduce_time_step(reduction_factor=0.6)
self.reset_engine()
try:
if not n_steps:
self.simulation.auto_equilibrate()
else:
self.simulation.run(n_steps,True, verbose, output_log, work_dir,**settings)
equil_works = True
except MDEngineError:
equil_works = False
counter += 1
# if time_step was altered during equil, revert back to original value
self.simulation.time_step = self.production_time_step
if not equil_works:
raise MDEngineError
[docs]
def reset_engine(self) -> None:
"""
Clears and resets the MD engine when there is an error thrown by the
equilibration or production methods. Currently this is only applicable to LAMMPS.
"""
self.simulation.engine.clear()
# pylint: disable=protected-access
# it is necessary to reset and setup the MD engine again
self.simulation._setup()
[docs]
def trial_reduce_time_step(self,reduction_factor) -> None:
"""
Reduces the time_step by the reduction factor specified.
Parameters
----------
reduction_factor : float
represents the value which will be multiplied with the time_step in order to reduce it
Returns
-------
None
"""
self.simulation.time_step *= reduction_factor
[docs]
def calculate_max_FoM(self):
"""
Calculates a maximum Figure of Merit value by comparing a set of experimental observable
data, to arrays consisting of numbers close to zero. For use when the MD Engine fails with
a set of parameter values.
Returns
-------
max_FoM : int
value of maximum FoM
"""
# pylint: disable=protected-access
# the purpose of method is to create this empty observable so accessing is necessary
self.observable_pairs[0].MD_obs._dependent_variables = {}
self.observable_pairs[0].MD_obs._dependent_variables['SQw'] = 1e-9
max_FoM = self.FoM_calculator.calculate()
self.observable_pairs[0].MD_obs._dependent_variables = None
return max_FoM