Source code for MDMC.trajectory_analysis.compact_trajectory

"""
Module for memory-efficient MD trajectory handling.

Notes
-----
Seeing how the current (as of August 2022) trajectory implementation
consumes a lot of memory, an attempt is being made to build an object
that will contain the bare minimum of functionality, to show the
limits of performance that we can achieve within Python.
"""

# The main idea is that, ultimately, we need to store
# 3 * n_atoms * n_steps
# floating point numbers if we want to process a trajectory.
# Independent of the programming language we use, we can expect
# that each coordinate will be at least a 32-bit float, or
# a 64-bit float if we use the default value normally picked
# by numpy.

import numpy as np

from MDMC.common import units
from MDMC.MD.structures import Atom
from MDMC.trajectory_analysis.trajectory import TemporalConfiguration


[docs] class CompactTrajectory: """ Store an MD trajectory in numpy arrays. The units are system units. The normal workflow for a CompactTrajectory is: 1. Create and allocate memory. .. code-block:: python traj = CompactTrajectory(n_step, n_atoms, useVelocity) or traj = CompactTrajectory() traj.preAllocate(n_step, n_atoms, useVelocity) 2. Write values into arrays. .. code-block:: python for n in frame_numbers: if not traj.validateTypes(atom_types): traj.WriteOneStep(step_number = n, time = time_value, positions = pos_values) traj.setDimensions(box_dimensions, step_number = n) 3. Add chemical element information. .. code-block:: python traj.labelAtoms(atom_symbols, atom_masses) traj.setCharges(atom_charges) 5. Check internal consistency, trim arrays. .. code-block:: python traj.postProcess() Parameters ---------- n_steps : int, optional Number of simulation steps in the trajectory. Defaults to 0. ``n_steps = 0`` means that ``preAllocate`` will have to be run separately after the ``CompactTrajectory`` has been created. ``n_steps > 0`` means that ``preAllocate`` will be run immediately in the ``__init__`` function. n_atoms : int, optional Number of atoms in the system. Defaults to 1. useVelocity : bool, optional If the trajectory contains velocities, set to ``True`` to allocate an additional array for the velocity values. Defaults to False. bits_per_number : int, optional Set number of bytes in floating point numbers. **settings : dict Extra options. """ def __init__(self, n_steps: int = 0, n_atoms: int = 1, useVelocity: bool = False, bits_per_number: int = 8, **settings: dict): # The development plan is to use the units defined here to calculate conversion factors, # and use these factors when writing the numbers into the arrays. # For now, we define the units here: self.position_unit = units.SYSTEM['LENGTH'] self.time_unit = units.SYSTEM['TIME'] self.velocity_unit = units.SYSTEM['LENGTH'] / units.SYSTEM['TIME'] self.dtype = self._get_dtype(bits_per_number) # this sets the data type to np.float64 # The underlying assumption is that the number of atoms, # and the atom types, stay CONSTANT within the trajectory, # and so we define them as header data, and not separately for every frame: self.n_atoms = n_atoms self.n_steps = n_steps self.atom_types = [] # atom types defined as numbers, following the MD engine definition self.atom_masses = [] # atom masses, floating point numbers, one for each atom self.atom_charges = [] # atom charges, floating point numbers, one for each atom # The initial value of self.dimensions is set to a number too low to be physical, # but different from 0. This way if an observable tried to calculate the Q vectors # or, more precisely, reciprocal space vectors # from an unpopulated CompactTrajectory, it would not divide by zero. self.dimensions = 0.1*np.ones(3) # avoids divide-by-zero errors, explained above. self.changing_dimensions = None self.element_list = [] # chemical element (str) defined for each atom self.element_set = set() # set of chemical elements (str) present in the trajectory # key point: the data! # this is where we will keep the numpy arrays # vvvvvvvvvvvvvvvvvv self.position = None self.velocity = None self.time = None # now some state indicators: self.is_allocated = False # preAllocate has been run, numpy arrays exist self.is_populated = False # numpy arrays are not empty; some data have been written self.is_fixedbox = True # the dimensions of the simulation box are fixed; self.has_velocity = useVelocity # ^^^^^^^^^^^^^^ # the self.is_fixedbox could potentially be False for an NPT ensemble # in that case self.changing_dimensions will contain the box dimensions for each step. self.first_index = 1e5 # the first array index at which we have written data self.last_index = 0 # the last array index at which we have written data # ^^^^^^^^^^^^^ # since we use np.empty, the elements of the array are _not_ initialised to any value, # so it is a precaution to slice the arrays at the end to cut off the empty parts. # ------------- # now we allocate the arrays # vvvvvvvvvvvvv if n_steps > 0: self.preAllocate(n_steps=n_steps, n_atoms=n_atoms, useVelocity=useVelocity) try: self.universe = settings['universe'] except KeyError: self.universe = None @staticmethod def _get_dtype(bpn: int) -> np.dtype: """ Get appropriate numpy type from bits per number. Parameters ---------- bpn : int Bytes in floating point. Returns ------- numpy.dtype Floating point datatype. """ if bpn > 8: return np.float128 if bpn > 4: return np.float64 if bpn > 2: return np.float32 return np.float16
[docs] def setBytesPerNumber(self, bytes_per_number: int = 8): """ Change the number of bytes per number in the arrays. This is used for storing atom positions, velocities, the frame timestamps and simulation box dimensions. The best approach is to set the correct value before populating the arrays, but it is still possible to change the data type using this function when the ``CompactTrajectory`` already contains some values. Parameters ---------- bytes_per_number : int, optional If 8, the arrays will use :any:`np.float64` to store the positions, velocities and time at each step. Will always be rounded up to the nearest multiple of 2. See Also -------- CompactTrajectory._get_dtype : Type getter. """ self.dtype = self._get_dtype(bytes_per_number) if len(self) > 0: self.time = self.time.astype(self.dtype) self.position = self.position.astype(self.dtype) if self.velocity is not None: self.velocity = self.velocity.astype(self.dtype) self.changing_dimensions = self.changing_dimensions.astype( self.dtype)
def __len__(self): if self.position is None: return 0 # Since it is not guaranteed that the postProcess method has been run, # we explicitly limit the indices to those which have been written into. # The length corresponds to the number of simulation steps. return len(self.position[self.first_index:self.last_index + 1]) def __getitem__(self, index: int | slice): # different behaviour: # a single index extracts a subtrajectory # with length 1, # while a slice with produce a subtrajectory, # which is another CompactTrajectory. try: start, stop, step = index.start, index.stop, index.step except AttributeError as exc: if index >= self.__len__(): raise IndexError("Trying to access a nonexistent time" " frame in the CompactTrajectory.") from exc return self.subtrajectory(index, index+1, 1) return self.subtrajectory(start, stop, step) def __eq__(self, other: 'CompactTrajectory') -> bool: if not self.is_allocated == other.is_allocated: return False if not self.is_populated == other.is_populated: return False if not self.is_fixedbox == other.is_fixedbox: return False if not self.has_velocity == other.has_velocity: return False are_the_same = (other.position_unit == self.position_unit and other.time_unit == self.time_unit and other.velocity_unit == self.velocity_unit and other.dtype == self.dtype) if are_the_same: are_the_same = np.all([ self.n_atoms == other.n_atoms, self.first_index == other.first_index, self.last_index == other.last_index]) else: print("Failed header comparison") if are_the_same: are_the_same = np.all([ np.allclose(other.dimensions.shape, self.dimensions.shape), np.allclose(other.position.shape, self.position.shape), np.allclose(other.time.shape, self.time.shape), np.allclose(other.changing_dimensions.shape, self.changing_dimensions.shape)]) else: print("Failed index comparison") if are_the_same: are_the_same = self.universe == other.universe else: print("Failed shape comparison") print(f"Dimensions other: {other.dimensions.shape}, self: {self.dimensions.shape}") print(f"Position other: {other.position.shape}, self: {self.position.shape}") print(f"Time other: {other.time.shape}, self: {self.time.shape}") print(f"CDim other: {other.changing_dimensions.shape}" f"self: {self.changing_dimensions.shape}") if are_the_same: if self.is_fixedbox: are_the_same = np.all([ np.allclose(other.dimensions, self.dimensions), np.allclose(other.position, self.position), np.allclose(other.time, self.time)]) else: are_the_same = np.all([ np.allclose(other.dimensions, self.dimensions), np.allclose(other.position, self.position), np.allclose(other.time, self.time), np.allclose(other.changing_dimensions, self.changing_dimensions)]) else: print("Failed universe comparison") if are_the_same: if self.velocity is not None and other.velocity is not None: are_the_same = np.allclose(self.velocity, other.velocity) else: print("Failed array comparison") if are_the_same: are_the_same = np.all([ np.all(other.atom_types == self.atom_types), other.n_atoms == self.n_atoms, np.allclose(other.atom_charges, self.atom_charges), np.allclose(other.atom_masses, self.atom_masses), other.element_list == self.element_list, other.element_set == self.element_set]) else: print("Failed velocity comparison") if not are_the_same: print(f"atom_types: {np.all(other.atom_types == self.atom_types)}") print(f"n_atoms: {other.n_atoms == self.n_atoms}") print(f"atom_charges: {np.allclose(other.atom_charges, self.atom_charges)}") print(f"atom_masses: {np.allclose(other.atom_masses, self.atom_masses)}") print(f"element_list: {other.element_list == self.element_list}") print(f"element_set: {other.element_set == self.element_set}") return are_the_same @property def velocities(self) -> np.ndarray: """ The velocity array. Added for those parts of code that use the plural form instead of singular. Returns ------- ~numpy.ndarray Velocities array. """ return self.velocity @property def times(self) -> np.ndarray: """ The time array. Added for those parts of code that use the plural form instead of singular. Returns ------- ~numpy.ndarray Time samples array. """ return self.time @property def positions(self) -> np.ndarray: """ The position array. Added for those parts of code that use the plural form instead of singular. Returns ------- ~numpy.ndarray Time samples array. """ return self.position @property def data(self) -> np.ndarray: """ The step numbers and time steps in a single array. Returns ------- ~numpy.ndarray Concatenation of separate data arrays. """ return np.column_stack([ np.arange(self.n_steps), self.time, # there used to be a TemporalConfiguration here as well. ])
[docs] def preAllocate(self, n_steps: int = 1, n_atoms: int = 1, useVelocity: bool = False): """ Allocate array space for data. Creates empty arrays for storing atom positions, velocities, time values of the simulation frames, and the simulation box dimensions. The numpy empty object do not initialise elements until they have been accessed by a read or write operation. This means that allocating too many steps is generally safe and harmless, as the unused steps will be removed when the postProcess method is called. Parameters ---------- n_steps : int, optional Number of simulation steps in the trajectory. Defaults to 1. n_atoms : int, optional Number of atoms in the system. Defaults to 1. useVelocity : bool, optional If the trajectory contains velocities, set to `True` to allocate an additional array for the velocity values. Defaults to False. """ if self.is_allocated: print("WARNING: preAllocate has already been run on this CompactTrajectory.") self.n_atoms = n_atoms # from now on we decide that each step will have n_atoms self.n_steps = n_steps # and there will be n_steps in this trajectory. # time steps are the first dimension of the arrays. shape = (n_steps, n_atoms, 3) self.time = np.empty(n_steps, dtype=self.dtype) self.position = np.empty(shape, dtype=self.dtype) if useVelocity: self.velocity = np.empty(shape, dtype=self.dtype) # First change of state: the arrays have been allocated. self.is_allocated = True self.changing_dimensions = np.empty((n_steps, 3), dtype=self.dtype)
# ^^^^^^^^^^^^^^^^^^^^^^ # We are fairly confident that the dimensions of the simulation box will NOT change # during the simulation, but allocating this array does not really cost us anything, # and, if it turns out that the dimensions do change, we will be prepared.
[docs] def setCharge(self, charge_list: list[float] = None) -> bool: """ Set the values of partial charge for each atom. It assumes that the charges are constant within the simulation. Parameters ---------- charge_list : list[float] Fractional electrical charge of each atom, given in a list with one floating point number per atom. Returns ------- bool Whether charges have been set. """ # If need be, we can modify it to work the same # as setDimensions, so it populates a time-dependent # array if it turns out that the charge values # change between the time frames. if charge_list is None: return False if not len(charge_list) == self.n_atoms: raise ValueError("Wrong number of charges in setCharge.") self.atom_charges = np.array(charge_list) return True
[docs] def setDimensions(self, frame_dimensions: np.ndarray = None, step_num: int = -1): """ Write the simulation box dimensions into the object header. Additionally, if the simulation box dimensions change with time, it will keep track of the new dimensions at each step in the self.changing_dimensions object. Parameters ---------- frame_dimensions : numpy.ndarray 3 float numbers defining the size of the simulation box along x, y and z. step_num : int The number of the simulation frame at which the frame_dimensions array was read. """ # The initial self.dimensions is set to 0.1 Angstrom. # This is too small to ever be used in a simulation, and at the same time # it is not 0. # Here we check if the initial values has already been overwritten with # a physical value of dimensions. if np.all(np.abs(self.dimensions - 0.1) < 1e-5): self.dimensions = frame_dimensions self.changing_dimensions[0] = frame_dimensions elif np.allclose(frame_dimensions, self.dimensions, rtol=1e-6, atol=1e-4): # If the new dimensions are the same as the current ones, we do nothing. pass else: self.is_fixedbox = False # the dimensions of the box DO change in this trajectory! # we save the dimensions self.changing_dimensions[step_num] = frame_dimensions # per simulation step now. self.dimensions = self.changing_dimensions[self.first_index:self.last_index+1].mean( 0)
# ^^^^^^^^^^^^^ # now that we have discovered that the dimensions change, # we use the mean value over time as the dimensions parameter.
[docs] def writeOneStep(self, step_num: int = -1, time: float = -1.0, positions: np.array = None, velocities: np.array = None): """ Write the atom properties of a single frame into the trajectory arrays. Parameters ---------- step_num : int, optional The index at which the numbers will be written. Defaults to -1. time : float The time stamp of the simulation step, in the correct time units (femtoseconds). Defaults to -1.0. positions : np.array The array of the atom positions, shaped (n_atoms, 3). Defaults to None. velocities : np.array, optional The array of the atom velocities, shaped (n_atoms, 3). If we don't use velocities, it can be skipped. Defaults to None. """ if not self.is_allocated: raise IndexError("Writing outside of the reserved array range.") self.time[step_num] = time self.position[step_num, :, :] = positions if velocities is not None: self.velocity[step_num, :, :] = velocities # some housekeeping: # we take note of the indices that have been written to. # In case we allocated more memory than needed, # we keep track of the indices where the data has been written, # so we can discard the unused part later, # as the uninitalised part of the array will contain random numbers. self.first_index = min(step_num, self.first_index) self.last_index = max(step_num, self.last_index)
[docs] def writeEmptyStep(self, step_num: int = -1, time: float = -1.0): """ Advance the iterators without writing any data. It is basically a reduced version of the writeOneStep method. This was added since the tests/trajectory_analysis/test_PDF.py use a trajectory made of 1000 _empty_ configurations. Parameters ---------- step_num : int, optional The index at which the numbers will be written. Defaults to -1. time : float, optional The time stamp of the simulation step, in the correct time units (femtoseconds). Defaults to -1.0. """ if not self.is_allocated: raise IndexError("Writing outside of the reserved array range.") self.time[step_num] = time self.first_index = min(step_num, self.first_index) self.last_index = max(step_num, self.last_index)
def _create_types_from_elements(self, atom_types: list[str]) -> np.ndarray: """ Translate IDs to elements. Parameters ---------- atom_types : list[str] Mapping from from elements to integers. Returns ------- numpy.ndarray List of new IDs. Notes ----- Some engines (e.g. LAMMPS) like using numbers at atom types, while others use letters. CompactTrajectory was written for LAMMPS initially, and likes having numbers for types. This function will check if the types are numbers, and replace them with consitent positive numbers if it turns out that they were not. """ if len(atom_types) == 0: return np.array([]) try: int(atom_types[0]) except ValueError: all_types = np.unique(atom_types) compare_types = np.array(atom_types, dtype=str) number_types = np.zeros(len(atom_types), dtype=np.int64) for number, at_type in enumerate(all_types, 1): number_types[np.where(compare_types == at_type)] = number return number_types return np.array(atom_types)
[docs] def validateTypes(self, raw_atom_types: np.array) -> bool: """ Check and set the array of atom types. If previously run, verify the array of atom types from the new frame is the same as the original array of atom types. If the atom types have changed during the simulation, we cannot process the results using the ``CompactTrajectory`` object, and the validation will return ``False``. It is up to the engine facade to decide what to do if ``validateTypes`` returns ``False``. Parameters ---------- raw_atom_types : numpy.array An array of all the atom types, sorted by the atom ID. If these are numbers, like in a LAMMPS simulation, then they will be stored directly. If ``raw_atom_types`` are strings, an internal method called ``_create_types_from_elements`` will create numbers that correspond to the strings, so that every atom type has its own number. Returns ------- bool Whether the atom types are unchanged. """ # codes like LAMMPS make it possible to generate or destroy atoms/particles # in a simulation to simulate flow. It is unlikely to happen in an MDMC run. # Since we cannot handle such a case, # we will return False to indicate that this trajectory is not suitable for MDMC. atom_types = self._create_types_from_elements(raw_atom_types) # ^^^^^^^^ # This ensures that the atom types are numers. if len(self.atom_types) == 0: # case 1: atom_types have not been set if len(atom_types) == self.n_atoms: self.atom_types = atom_types # and now they are set. return True elif np.all(self.atom_types == atom_types): # case 2: atom_types have been set return True # and have not changed return False # case 3: atom_types have been set and have changed.
[docs] def labelAtoms(self, atom_symbols: dict[int, str] = None, atom_masses: dict[int, float] = None) -> bool: """ Create internal atom references. - A `list` of chemical elements of all the atoms in a trajectory frame, - a `set` of all the chemical elements present in a trajectory, - a `list` of atom masses of all the atoms in a trajectory frame. It is up to the engine facade to construct the input dictionaries containing the correct information about the chemical elements and masses. Parameters ---------- atom_symbols : dict[int, str] Mapping from trajectory atom_ID to chemical symbol. atom_masses : dict[int, float] Mapping from trajectory atom_ID to chemical mass. Returns ------- bool References created successfully. """ if len(self.atom_types) == 0: return False # tests/trajectory_analysis/test_PDF.py use empty Configurations # so we accept a case of no atoms in the CompactTrajectory. # We just return False in case we wanted to check in real code if # we are trying to set labels on a CompactTrajectory with no atoms. self.element_list = [str(atom_symbols[atom_id]) for atom_id in self.atom_types] self.element_set = set(self.element_list) self.atom_masses = np.array([atom_masses[atom_id] for atom_id in self.atom_types]) return True
[docs] def postProcess(self): """ Finalise trajectory analysis. Notes ----- This function should be called after the all the trajectory steps have been read. It will discard the unnecessary rows of the arrays, in case we had allocated too many due to some rounding error. """ if not self.is_populated: self.position = self.position[self.first_index:self.last_index+1] self.time = self.time[self.first_index:self.last_index+1] if self.velocity is not None: self.velocity = self.velocity[self.first_index:self.last_index+1] self.changing_dimensions = self.changing_dimensions[self.first_index:self.last_index+1] self.first_index = 0 self.last_index = len(self.position)-1 self.is_populated = True
[docs] def subtrajectory(self, start: int = 0, stop: int = -1, step: int = 1, atom_filter: list[int] = None): """ Slice a ``CompactTrajectory``. Returns another ``CompactTrajectory`` instance, which contains the same header information, and a subset of the original trajectory steps. The arrays in the original trajectory will be sliced following the pattern: `new = old[start:stop:step]`. Optionally, ``atom_filter`` can be specified to choose only specific atoms from the trajectory. It is recommended not to use ``subtrajectory`` directly in this case, but to use the ``filter_by_element`` or ``filter_by_type`` methods, which will create the ``element_filter`` themselves. Parameters ---------- start : int Number defining the beginning of the slicing range. stop : int Number defining the end of the slicing range. step : int Step size of the slicing operation. atom_filter : list[int] Atom indices to extract. Returns ------- CompactTrajectory A trajectory containing the same header and the same or fewer steps than the original. See Also -------- filter_by_element : Get components by species. filter_by_type : Get components by ID. """ self.postProcess() temp = CompactTrajectory() # copy over all the transferable parts temp.position_unit = self.position_unit temp.time_unit = self.time_unit temp.velocity_unit = self.velocity_unit temp.dtype = self.dtype temp.dimensions = self.dimensions temp.universe = self.universe if atom_filter is None: temp.position = self.position[start:stop:step, :, :] temp.time = self.time[start:stop:step] temp.changing_dimensions = self.changing_dimensions[start:stop:step, :] if self.velocity is not None: temp.velocity = self.velocity[start:stop:step, :, :] temp.n_atoms = self.n_atoms temp.atom_types = self.atom_types.copy() temp.atom_masses = self.atom_masses.copy() temp.atom_charges = self.atom_charges.copy() temp.element_list = self.element_list temp.element_set = self.element_set else: temp.position = self.position[start:stop:step, atom_filter, :] temp.time = self.time[start:stop:step] temp.changing_dimensions = self.changing_dimensions[start:stop:step, :] if self.velocity is not None: temp.velocity = self.velocity[start:stop:step, atom_filter, :] temp.atom_types = self.atom_types[atom_filter] temp.n_atoms = len(temp.atom_types) temp.atom_charges = self.atom_charges[atom_filter] temp.atom_masses = self.atom_masses[atom_filter] temp.element_list = [self.element_list[x] for x in atom_filter] temp.element_set = set(temp.element_list) temp.is_allocated = True temp.is_populated = True temp.has_velocity = self.has_velocity temp.is_fixedbox = self.is_fixedbox temp.first_index = 0 temp.last_index = len(temp.position)-1 temp.n_steps = len(temp.position) temp.postProcess() return temp
[docs] def filter_by_time(self, start: float, end: float = None) -> 'CompactTrajectory': """ Filter to within time defined by ``start`` and ``end``. Create another CompactTrajectory, containing only the frames with the time values equal to or larger than the start parameter, and smaller than the end parameter. If only one time value is given, the function will create a CompactTrajectory with a single time step equal to start, or raise an error if start is not an element of the time array. Parameters ---------- start : float The start time for filtering the ``Trajectory``. end : float, optional The end time for filtering the ``Trajectory``. The default is `None`, which means the new returned ``Trajectory`` has a single time, defined by the ``start``. Returns ------- CompactTrajectory A copy with ``times`` in half open interval defined by ``start`` and ``end``. Raises ------ ValueError No valid MD frames found. """ if end is None: index = np.where(self.time == start)[0].ravel() if not index.size: raise ValueError("The specified time range contains no MD frames") return self.subtrajectory(index[0], index[0]+1) index = np.where((self.time >= start) & (self.time < end))[0].ravel() if not index.size: raise ValueError("The specified time range contains no MD frames") return self.subtrajectory(index[0], len(index))
[docs] def filter_by_element(self, elements: list[str]) -> 'CompactTrajectory': """ Filter subtrajectory by chemical symbol. Parameters ---------- elements : list[str] List of chemical elements given as string. Returns ------- CompactTrajectory A ``CompactTrajectory`` containing only the atoms of the specified chemical elements. """ indices = [] for element in elements: if element in self.element_list: index = np.where(np.array(self.element_list) == element)[0].ravel() indices.append(index) index = np.concatenate(indices) index = np.sort(index) return self.subtrajectory(0, len(self), step=1, atom_filter=index)
[docs] def filter_by_type(self, types: list[int]) -> 'CompactTrajectory': """ Filter subtrajectory by atom ID. Parameters ---------- types : list[int] A list of atom IDs. Returns ------- CompactTrajectory A ``CompactTrajectory`` containing only the atoms of the specified type. """ indices = [] for atom_type in types: if atom_type in self.atom_types: index = np.where(self.atom_types == atom_type)[0].ravel() indices.append(index) index = np.concatenate(indices) index = np.sort(index) return self.subtrajectory(0, len(self), step=1, atom_filter=index)
[docs] def exportAtom(self, step_number: int = 0, atom_number: int = 0): """ Create an ``Atom`` object for a chosen time step of the simulation and atom number. Parameters ---------- step_number : int Number of the simulation step at which the atom position should be read. atom_number : int Number of the atom in the trajectory that will be created as an ``Atom`` object. Returns ------- Atom A single ``Atom`` object. Notes ----- For compatibility with ``Trajectory``. """ try: element = self.element_list[atom_number] except AttributeError: element = '?' try: velocity = self.velocity[step_number, atom_number, :] except AttributeError: velocity = (0.0, 0.0, 0.0) try: charge = self.atom_charges[atom_number] except IndexError: charge = 0.0 return Atom(element, self.position[step_number, atom_number, :], velocity, charge=charge)
[docs] def configurations_as_compact_trajectory( *configs: list[TemporalConfiguration], ) -> CompactTrajectory: """ Populate ``CompactTrajectory`` arrays from list of ``TemporalConfiguration``. Parameters ---------- *configs : list[TemporalConfiguration] Most likely containing atoms. Returns ------- CompactTrajectory With atom types and positions matching those in the input configurations. Raises ------ TypeError No configurations provided. """ if not configs: raise TypeError("At least one Configuration is needed" " for the CompactTrajectory.fromConfigs()") traj = CompactTrajectory(n_steps=len(configs), n_atoms=len(configs[0].atoms), useVelocity=len(configs[0].atom_velocities) > 0, universe=configs[0].universe) for step_number, config in enumerate(configs): try: current_time = config.time except AttributeError: current_time = 0.0 if len(config.data) > 0: atpos = np.vstack(config.atom_positions) atvel = np.vstack(config.atom_velocities) traj.writeOneStep(step_num=step_number, time=current_time, positions=atpos, velocities=atvel) else: traj.writeEmptyStep(step_num=step_number, time=current_time) try: dim = config.universe.dimensions except AttributeError: continue else: traj.setDimensions(dim, step_num=step_number) # here we extract as much header information as possible # from the TemporalConfiguration elements = [] # list of 'chemical_element_symbol', 1 per atom # dictionary of 'chemical_element_symbol' : mass (float) in a.m.u. masses = {} # dictionary of 'atom_type' : mass (float) in a.m.u. mass_per_type = {} element_per_type = {} # dictionary of 'atom_type' : 'chemical_element_symbol' types = [] # a list of 'atom_type', 1 entry for each atom id_values = [] # a list of 'atom_ID', 1 entry for each atom atom_charge = [] # we assume -for now- that the Trajectory stores atom_type. has_types = True atom_counter = 0 # we just iterate over Atom objects for nat, atom in enumerate(configs[0].atoms): element = atom.element.symbol mass = atom.mass charge = atom.charge elements.append(element) atom_charge.append(charge) masses[element] = mass if has_types: try: types.append(atom.atom_type) except AttributeError: has_types = False id_values.append(atom.ID) atom_counter = nat + 1 if not has_types: all_elements = sorted(list(np.unique(elements))) types = [all_elements.index(x) for x in elements] for x in range(atom_counter): mass_per_type[types[x]] = masses[elements[x]] element_per_type[types[x]] = elements[x] traj.validateTypes(np.array(types)[np.argsort(id_values)]) traj.labelAtoms(element_per_type, mass_per_type) traj.setCharge(atom_charge) traj.postProcess() return traj