"""
Module for AbstractSQw and total SQw class.
"""
from contextlib import suppress
from typing import Optional
import numpy as np
from numpy.testing import assert_allclose
from scipy.interpolate import RectBivariateSpline
from MDMC.common import units
from MDMC.common.constants import h, h_bar
from MDMC.common.decorators import unit_decorator_getter
from MDMC.resolution.resolution_factory import ResolutionFactory
from MDMC.trajectory_analysis.compact_trajectory import CompactTrajectory
from MDMC.trajectory_analysis.observables.obs import Observable
from MDMC.trajectory_analysis.observables.obs_factory import ObservableFactory
from MDMC.utilities.trajectory_slicing import slice_trajectory
[docs]
class SQwMixins:
"""
A mixin class for properties used by both SQw and FQt objects.
"""
[docs]
def minimum_frames(self, dt: float = None) -> int:
r"""
Compute minimum number of ``CompactTrajectory`` frames to calculate ``dependent_variables``.
Parameters
----------
dt : float, optional
The time separation of frames in ``fs``, default is `None`.
Returns
-------
int
The minimum number of frames.
Notes
-----
Depends on ``self.use_FFT``.
If `self.use_FFT == True`, it is the number of energy steps + 1, in order to allow for
a reflection in time which only counts the end points once.
If `self.use_FFT == False`, there is not a hard minimum on number of frames. However, to
distinguish our smallest differences in energy :math:`F(Q,t)` needs to
cover at least a time period :math:`T_{min}` such that:
.. math::
T_{min} \sim \frac{h}{\Delta E_{min}}
Due to the aforementioned reflection in the time domain, to cover a
period of :math:`T_{min}` we only need :math:`N` frames:
.. math::
N = \frac{T_{min}}{2 \Delta t} + 1 = \frac{h}{2 \Delta t \Delta E_{min}} + 1
"""
nE = len(self.E)
if self.use_FFT:
return nE + 1
# Either take the smallest absolute energy, or the smallest separation
# of energies we wish to discriminate between
minimum_abs_energy = np.min(np.abs(self.E[self.E != 0]))
minimum_energy_separation = np.min(np.diff(np.sort(self.E)))
limiting_energy = min(minimum_abs_energy, minimum_energy_separation)
# h is in units of eV s whereas system units are meV fs, so apply a
# factor of 1e3 * 1e15 to convert it
required_time = h * 1e18 / limiting_energy
# We need an integer number of frames, so round up using np.ceil to
# ensure we exceed the minimum number of frames needed
return int(np.ceil(required_time / (2 * dt) + 1))
[docs]
def maximum_frames(self) -> Optional[int]:
"""
Compute maximum number of ``CompactTrajectory`` frames to calculate ``dependent_variables``.
Returns
-------
int
The maximum number of frames.
Notes
-----
Depends on `self.use_FFT`.
If `True`, it is the number of energy steps + 1, in order to allow for
a reflection in time which only counts the end points once.
Otherwise, there is no limit and all frames will contribute to the
calculation.
"""
if self.use_FFT and (self.E is not None):
return len(self.E) + 1
return None
@property
@unit_decorator_getter(unit=units.LENGTH ** -1)
def Q(self) -> Optional[np.array]:
"""
Get or set the momentum transfers.
Returns
-------
numpy.array
1D array of Q `float` (in ``Ang^-1``).
"""
try:
return self.independent_variables['Q']
except KeyError:
return None
@Q.setter
def Q(self, value: np.array) -> None:
self.independent_variables['Q'] = value
[docs]
class AbstractSQw(SQwMixins, Observable):
"""
An abstract class for total, coherent and incoherent dynamic structure factors.
The equations used for calculating this are based on Kneller *et al*. [Kneller]_.
.. note::
Properties for MD frames & Q are found in the :class:`SQwMixins` class.
References
----------
.. [Kneller] Kneller *et al*. Comput. Phys. Commun. 91 (1995) 191-214.
"""
def __init__(self):
super().__init__()
self._independent_variables = None
self._dependent_variables = None
self._errors = None
self.resolution = None
# Use FFT by default
self._use_FFT = True
self._recreated_Q = None
@property
def independent_variables(self) -> dict:
"""
Get or set the independent variables.
These are:
- the frequency :math:`Q` (in ``Ang^-1``)
- energy :math:`E` (in``meV``)
Returns
-------
dict
The independent variables.
"""
return self._independent_variables
@independent_variables.setter
def independent_variables(self, value: dict) -> None:
self._independent_variables = value
@property
def dependent_variables(self) -> dict:
"""
Get the dependent variables.
This is:
- SQw, the dynamic structure factor (in ``arb``)
Returns
-------
dict
The dependent variables.
"""
return self._dependent_variables
@property
def errors(self) -> dict:
"""
Get or set the errors on the dependent variables.
The dynamic structure factor (in ``arb``)
Returns
-------
dict
The errors on the ``dependent_variables``.
"""
return self._errors
@errors.setter
def errors(self, value: dict) -> None:
self._errors = value
@property
@unit_decorator_getter(unit=units.ENERGY_TRANSFER)
def E(self) -> 'np.array':
"""
Get the energies.
Returns
-------
numpy.array
1D array of energy `float` (in ``meV``).
"""
if self.independent_variables:
try:
return self.independent_variables['E']
except KeyError:
pass
return None
@property
@unit_decorator_getter(unit=units.Unit('ps') ** -1)
def w(self) -> 'np.array':
"""
Get the angular frequencies.
Returns
-------
numpy.array
1D array of angular frequency (in ``1 / ps``).
"""
return self.E / (h_bar * 1e15)
@property
@unit_decorator_getter(unit=units.ARBITRARY)
def SQw(self) -> Optional[list[np.ndarray]]:
r"""
Get the dynamic structure factor, :math:`S(Q, \omega{})` (in ``arb``).
Returns
-------
list of numpy.ndarray
`list` of 2D arrays of :math:`S(Q, \omega{})` (in arbitrary units).
"""
try:
return self.dependent_variables['SQw']
except KeyError:
return None
@property
@unit_decorator_getter(unit=units.ARBITRARY)
def SQw_err(self) -> Optional[list[np.ndarray]]:
r"""
Get the errors on the dynamic structure factor (in ``arb``).
Returns
-------
list of numpy.ndarray
`list` of 2D arrays of :math:`S(Q, \omega{})` error (in arbitrary units).
"""
try:
return self.errors['SQw']
except KeyError:
return None
@property
def recreated_Q(self) -> np.ndarray:
"""
Get the indices of the recreated Q_values.
Returns
-------
numpy.ndarray
Array of recreated Q values.
"""
return self._recreated_Q
@recreated_Q.setter
def recreated_Q(self, recreated_Q_pos: list):
self._recreated_Q = recreated_Q_pos
[docs]
def validate_energy(self, time_step: float = None):
"""
Ensure ``dt`` is valid for computing energies.
Asserts that the user set frame separation ``dt`` leads to energy
separation that matches that of the experiment. If not, it
changes the time step and trajectory step to fix this. The time step value is
prioritised here.
Parameters
----------
time_step : float, optional
User specified length of time for each update of the atoms trajectories
in the simulation, default is None.
Returns
-------
valid : bool
Whether ``dt`` is valid.
traj_step : Optional[int]
Ideal ``traj_step`` if provided.
time_step : Optional[float]
Ideal ``time_step`` if provided.
dt : float
Required ``dt``.
"""
dt_required = self.calculate_dt()
if time_step is not None:
# Changing the time and traj step to fit the required dt value
# by finding the highest traj_step that can fit into the dt_required
traj_step = int(np.round(dt_required/time_step))
if traj_step == 0:
traj_step += 1
time_step = dt_required/traj_step
return True, traj_step, time_step, dt_required
return False, None, None, dt_required
[docs]
def calculate_from_MD(self,
MD_input: CompactTrajectory,
verbose: int = 0,
**settings: dict):
"""
Calculate the dynamic structure factor, S(Q, w) from a ``CompactTrajectory``.
If the ``CompactTrajectory`` has more frames than the
``self.maximum_frames()`` that can be used to recreate the
grid of energy points, it can slice the ``CompactTrajectory``
into sub-trajectories of length ``self.maximum_frames()``,
with the slicing specified through the settings
``use_average`` and ``cont_slicing``.
The ``independent_variable`` ``Q`` can either be set
previously or defined within ``**settings``.
Parameters
----------
MD_input : CompactTrajectory
An MDMC ``CompactTrajectory`` from which to calculate ``SQw``.
verbose : int, optional
The level of verbosity:
- Verbose level 0 gives no information.
- Verbose level 1 gives final time for the whole method.
- Verbose level 2 gives final time and also a progress bar.
- Verbose level 3 gives final time, a progress bar, and time per step.
**settings : dict
Extra options.
Other Parameters
----------------
n_Q_vectors : int
The maximum number of ``Q_vectors`` for any ``Q`` value. The
greater the number of ``Q_vectors``, the more accurate the
calculation, but the longer it will take.
dimensions : list, tuple or numpy.ndarray
A 3 element `tuple` or ``array`` of `float` specifying the
dimensions of the ``Universe`` in units of ``Ang``.
energy_resolution : dict
Specify energy resolution and function in units
of ueV (micro eV), in the format of the one-line dict
{'function': value}, where `function` is the resolution
function and `value` is the desired `FWHM`.
For example, to pass a Gaussian resolution of 80ueV we use
{'gaussian': 80}.
Currently accepted functions are 'gaussian' and
'lorentzian' Can also be 'lazily' given as `float`, in
which case it is assumed to be Gaussian.
Q_values : numpy.ndarray
1D array of Q `float` (in ``Ang^-1``).
use_average : bool
Optional parameter if a list of more than one ``Trajectory``
is used.
If set to True (default) then the mean value for S(Q, w)
is calculated. Also, the errors are set to the standard
deviation calculated over the list of
``CompactTrajectory`` objects.
cont_slicing : bool
Flag to decide between two possible behaviours when the
number of ``MD_steps`` is larger than the minimum required
to calculate the observables.
If ``False`` (default) then the ``CompactTrajectory`` is
sliced into non-overlapping sub-``CompactTrajectory``
blocks for each of which the observable is calculated.
If ``True``, then the ``CompactTrajectory`` is sliced into
as many non-identical sub-``CompactTrajectory`` blocks as
possible (with overlap allowed).
"""
self._origin = 'MD'
SQw_list = []
use_average = settings.get('use_average', True)
cont_slicing = settings.get('cont_slicing', False)
try:
override_dimensions = settings['dimensions']
except KeyError:
pass
else:
MD_input.setDimensions(np.array(override_dimensions))
# adds resolution attribute if it doesn't already exist
if self.resolution is None:
resolution_factory = ResolutionFactory()
if 'energy_resolution' in settings:
self.resolution = resolution_factory.create_instance(
settings['energy_resolution'])
else:
# if no resolution supplied, give the object null resolution
self.resolution = resolution_factory.create_instance(None)
# Create independent_variables dictionary if it doesn't exist
if not hasattr(self, 'independent_variables'):
self.independent_variables = {}
# Extract information that should be constant throughout the trajectory and hence the
# subtrajectories (if there are any)
t = MD_input.times - MD_input.times[0]
dt = t[1] - t[0]
if self.maximum_frames():
t = t[0:self.maximum_frames()]
if MD_input.is_fixedbox:
try:
self.universe_dimensions = MD_input.dimensions
except AttributeError:
try:
self.universe_dimensions = np.array(settings['dimensions'])
except KeyError as error:
raise AttributeError('Either trajectory requires a dimensions '
'attribute or dimensions must be passed '
'when calling calculate_from_MD') from error
else: # If the dimensions of the simulation box are not fixed, we use the first value
try:
self.universe_dimensions = MD_input.changing_dimensions[0]
except AttributeError:
try:
self.universe_dimensions = np.array(settings['dimensions'])
except KeyError as error:
raise AttributeError('Either trajectory requires a dimensions '
'attribute or dimensions must be passed '
'when calling calculate_from_MD') from error
# Test that, if there is an existing E, it is consistent with E
# calculated from trajectory times
if self.E is not None:
self.validate_energy(dt)
elif self.independent_variables:
self.independent_variables['E'] = self.calculate_E(len(t) - 1, dt)
else:
self.independent_variables = {'E': self.calculate_E(len(t) - 1, dt)}
# Overwrite independent variable 'Q' if it already exists
with suppress(KeyError):
self.independent_variables['Q'] = np.array(settings['Q_values'])
# Slice trajectory up if possible and requested by user:
if self.maximum_frames() and use_average:
trajectories = slice_trajectory(trj=MD_input, subtrj_len=self.maximum_frames(),
cont_slicing=cont_slicing)
trj_sliced = True
else:
trajectories = [MD_input]
trj_sliced = False
obtained_recreated_Q = False
# Perform calculations for each trajectory
for trajectory in trajectories:
self.trajectory = trajectory
# Assert that the times and dimensions are consistent with original trajectory
if trj_sliced:
try:
assert_allclose(self.trajectory.times -
self.trajectory.times[0], t, rtol=1e-7, atol=1e-3)
except AssertionError as error:
msg = ('The `times` of the current `CompactTrajectory` were not '
'consistent with the first `CompactTrajectory` passed')
raise AssertionError(msg) from error
try:
assert_allclose(self.universe_dimensions,
self.trajectory.dimensions)
except AttributeError:
# May not have dimensions set, in which case pass
pass
except AssertionError as error:
msg = ('The `dimensions` of the current `CompactTrajectory` were not '
'consistent with the first `CompactTrajectory` passed')
raise AssertionError(msg) from error
fqt_type = self._get_fqt_type()
# instantiate an FQt object for FQt calculations
FQt = ObservableFactory.create_observable(fqt_type)
FQt.Q = self.Q
# calculate FQt
FQt.calculate_from_MD(trajectory, **settings)
self.Q = FQt.Q
if not obtained_recreated_Q:
self._recreated_Q = FQt.recreated_Q
obtained_recreated_Q = True
SQw_list.append(FQt.calculate_SQw(self.E, self.resolution))
# Cleanup the trajectory to reduce memory usage
self.trajectory = None
# calculate average and errors
SQw_output = [np.mean(SQw_list, axis=0)]
errors_output = [np.std(SQw_list, axis=0)]
self._dependent_variables = {'SQw': SQw_output}
self._errors = {'SQw': errors_output}
def _get_fqt_type(self) -> str:
"""
Get the name of the FQt type associated with each SQw type.
Returns
-------
str
FQt associated with SQw.
"""
fqt_types = {'SQw': 'FQt',
'SQwCoherent': 'FQt_coh',
'SQwIncoherent': 'FQt_incoh'}
return fqt_types[self.__class__.__name__]
[docs]
@staticmethod
def calculate_E(nE: int, dt: float) -> np.ndarray:
r"""
Compute energy from MD.
Calculate an array of ``nE`` uniformly spaced energy values from the
time separation of the ``CompactTrajectory`` frames, ``dt``.
The frequencies are determined by the fast Fourier transform,
as implemented by numpy, for ``2 * nE`` points in time which
are truncated to only include ``nE`` positive frequencies.
As we are dealing with frequency rather than angular frequency
here, the relation to between energy is given by:
.. math::
E = h \nu
Parameters
----------
nE : int
The number of energy values to be calculated.
dt : float
The step size between frames in ``fs``.
Returns
-------
numpy.ndarray
An ``array`` of `float` specifying the energy in units of ``meV``.
See Also
--------
numpy.fft.fftfreq : Frequency calculation.
"""
# h is in units of eV s whereas system units are meV fs, so apply a
# factor of 1e3 * 1e15 to convert it
return h * 1e18 * np.fft.fftfreq(2 * int(nE), dt)[:int(nE)]
[docs]
def calculate_dt(self) -> float:
r"""
Calculate timestep required by experimental dataset assuming uniform spacing.
.. note::
This may be different from the time separation that the
user has given as an input, as it only depends on the
current values for ``self.E``.
The relationship between time and energy comes from the numpy
implementation of the FFT for ``2 * nE`` points where:
.. math:: \nu_{max} &=& \frac{n_E - 1}{2 n_E \Delta t} \\\\
\therefore \Delta t &=& \frac{h (n_E - 1)}{2 n_E E_{max}}
Returns
-------
float
The time separation required by the current values of ``self.E``.
See Also
--------
numpy.fft.fft : Numpy FFT implementation.
"""
# h is in units of eV s whereas system units are meV fs, so apply a
# factor of 1e3 * 1e15 to convert it
nE = len(self.E)
return h * 1e18 * (nE - 1) / (2 * nE * (np.max(np.abs(self.E))))
[docs]
def calculate_resolution_functions(self, dt: float) -> dict:
"""
Generate resolution function in momentum and time.
Parameters
----------
dt : float
The time spacing for the inverse Fourier transform (in ``fs``).
Ideally this should be the same as the frame separation
expected when applying this function.
Returns
-------
dict
A dictionary with the key 'SQw' corresponding to function
which accepts arrays of time and momentum (respectively)
and returns a 2D array of values for the instrument
resolution.
Notes
-----
There are several caveats to this function a user should be aware of:
- This uses the ``SQw`` values of the ``Observable`` it is
called from, and so should only be called for an observable
which has been created from relevant resolution data,
i.e. a vanadium sample.
- If this resolution function is used on data outside
its original range, then it will use nearest neighbour
extrapolation.
- The input will be reflected in the time/energy domain as
symmetry about 0 is assumed.
If for whatever reason this is not appropriate for the data in
question, this function should not be used.
"""
# NB: this function is only used by methods in the FileResolution object
# (see MDMC.resolution.from_file)
# but it hasn't been moved to that file
# because it relies so heavily on the SQw object's attributes
# that moving it over is a nightmare
# Remove any momentum values with infinite error, and the corresponding values from SQw
error = self.SQw_err
masking = np.where(np.any(error == float('inf'), axis=-1))
SQw_cropped = np.delete(self.SQw[0], masking, axis=0)
Q_cropped = np.delete(self.Q, masking)
# Enforce symmetry in the energy/time domain of the data. Note that if the original data
# included both positive and negative values, we will end up with more densely spaced
# values after this reflection.
E_reflected = np.append(self.E, -1 * self.E)
SQw_reflected = np.append(SQw_cropped, SQw_cropped, axis=-1)
# Sort the data so the width of each bin can be calculated using argsort so we can apply
# the same re-ordering to SQw. Because we cannot calculate the width of the first or last
# value, these are discarded from the energy and SQw arrays.
sorted_indices = np.argsort(E_reflected)
E_sorted = E_reflected[sorted_indices]
widths = (E_sorted[2:] - E_sorted[:-2]) / 2
E_sorted = E_sorted[1:-1]
SQw_sorted = SQw_reflected[:, sorted_indices[1:-1]]
# Perform a (slow) inverse discrete Fourier transform now, with all available data, rather
# than with potentially coarse time sampling during the SQw calculation
# Create time array with the spacing dt, and a min/max value of the Nyquist limit.
# h is in units of eV s whereas system units are meV fs, so
# apply a factor of 1e3 * 1e15 to convert it
max_energy_separation = np.amax(np.diff(E_sorted))
t_max = h * 1e18 / (2 * max_energy_separation)
N_T = int(t_max / dt)
t_array = np.linspace(- dt * N_T, dt * N_T, N_T)
SQw_ift = np.zeros((len(SQw_sorted), N_T), dtype='complex')
# In general we do not have equal energy spacing, multiply the exponential factor by this
# before transposing and dotting to sum over the energy domain
exp = np.exp(1j * np.outer(t_array, E_sorted) /
(h_bar * 1e18)) * widths
SQw_ift = np.dot(SQw_sorted, np.transpose(exp))
# Because Observable.dependent_variables_structure gives the order in which the
# independent variables are represented in the np.shape of the data, we have to
# reverse the order of the x and y arrays for RectBivariateSpline.
# RectBivariateSpline does not return complex numbers,
# so define a new function that combines the real and imaginary parts
def data_interpol(t, Q) -> np.ndarray:
"""
Internal function for interpolating complex t & Q.
Parameters
----------
t : numpy.ndarray
Time domain.
Q : numpy.ndarray
Momentum domain.
Returns
-------
np.ndarray
Interpolated complex t & Q data.
See Also
--------
scipy.interpolate.RectBivariateSpline : Interpolation mechanism.
Notes
-----
RectBivariateSpline does not return complex numbers, so
this function exists to emulate complex interpolation.
"""
real = RectBivariateSpline(t_array, Q_cropped, np.real(SQw_ift).T)
imag = RectBivariateSpline(t_array, Q_cropped, np.imag(SQw_ift).T)
return (real(t, Q) + 1j * imag(t, Q)).T
return {'SQw': data_interpol}
@property
def dependent_variables_structure(self) -> dict[str, list]:
"""
The order in which the 'SQw' dependent variable is indexed in terms of 'Q' and 'E'.
The purpose of this method is to ensure consistency
between different readers/methods which create ``SQw`` objects.
Returns
-------
dict[str, list]
The shape of the SQw dependent variable.
Notes
-----
Explicitly: we have that self.SQw[Q_index, E_index] is the data point for
given indices of self.Q and self.E.
It also means that:
np.shape(self.SQw)=(np.size(self.Q), np.size(self.E)).
"""
return {'SQw': ['Q', 'E']}
@property
def uniformity_requirements(self) -> dict[str, dict[str, bool]]:
"""
Get restrictions required for computing E & Q.
Captures the current limitations on the energy 'E' and reciprocal
lattice points 'Q' within the dynamic structure factor ``Observables``.
If using FFT, then 'E' must be uniform and start at zero, otherwise it
has no restrictions. 'Q' must be uniform but does not need to start at
zero.
Returns
-------
dict[str, dict[str, bool]]
Dictionary of uniformity restrictions for 'E' and 'Q'.
"""
if self.use_FFT:
e_requirements = {'uniform': True, 'zeroed': True}
else:
e_requirements = {'uniform': False, 'zeroed': False}
return {'E': e_requirements, 'Q': {'uniform': True, 'zeroed': False}}
[docs]
@ObservableFactory.register(('DynamicStructureFactor', 'SQw'))
class SQw(AbstractSQw):
"""
A class for the total dynamic structure factor.
Calculation is done in the respective FQt object, and this is
just a reference to get the correct FQt object.
"""
[docs]
@ObservableFactory.register(('CoherentDynamicStructureFactor',
'SQwCoherent',
'SQwCoh',
'SQw_coh'))
class SQwCoherent(AbstractSQw):
"""
A class for the coherent dynamic structure factor.
"""
[docs]
@ObservableFactory.register(('IncoherentDynamicStructureFactor',
'SQwIncoherent'
'SQwIncoh',
'SQw_incoh'))
class SQwIncoherent(AbstractSQw):
"""
A class for the incoherent dynamic structure factor.
"""