Source code for MDMC.trajectory_analysis.compact_trajectory

"""Module for memory-efficient MD trajectory handling.
Seeing how the current (as of August 2022) trajectory implementation
consumes a lot of memory, an attempt is being made to build an object
that will contain the bare minimum of functionality, to show the
limits of performance that we can achieve within Python.
"""

# The main idea is that, ultimately, we need to store
# 3 * n_atoms * n_steps
# floating point numbers if we want to process a trajectory.
# Independent of the programming language we use, we can expect
# that each coordinate will be at least a 32-bit float, or
# a 64-bit float if we use the default value normally picked
# by numpy.

from typing import Union, List
import numpy as np
from MDMC.common import units
from MDMC.MD.structures import Atom
from MDMC.trajectory_analysis.trajectory import TemporalConfiguration

[docs] class CompactTrajectory: """ Stores an MD trajectory in numpy arrays. The units are system units. The normal workflow for a CompactTrajectory is: 1. Create and allocate memory. traj = CompactTrajectory(n_step, n_atoms, useVelocity) or traj = CompactTrajectory() traj.preAllocate(n_step, n_atoms, useVelocity) 2. Write values into arrays. for n in frame_numbers: if not traj.validateTypes(atom_types): traj.WriteOneStep(step_number = n, time = time_value, positions = pos_values) traj.setDimensions(box_dimensions, step_number = n) 3. Add chemical element information traj.labelAtoms(atom_symbols, atom_masses) traj.setCharges(atom_charges) 4. Check internal consistency, trim arrays traj.postProcess() """ def __init__(self, n_steps: int = 0, n_atoms: int = 1, useVelocity: bool = False, bits_per_number: int = 8, **settings: dict): """ This is a bare constructor which initialises all the fields of a basic trajectory. Parameters ---------- n_steps (int, optional): Number of simulation steps in the trajectory. Defaults to 0. n_steps = 0 means that preAllocate will have to be run separately after the CompactTrajectory has been created. n_steps > 0 means that preAllocate will be run immediately in the __init__ function. n_atoms (int, optional): Number of atoms in the system. Defaults to 1. useVelocity (bool, optional): If the trajectory contains velocities, set to True to allocate an additional array for the velocity values. Defaults to False.""" # The development plan is to use the units defined here to calculate conversion factors, # and use these factors when writing the numbers into the arrays. # For now, we define the units here: self.position_unit = units.SYSTEM['LENGTH'] self.time_unit = units.SYSTEM['TIME'] self.velocity_unit = units.SYSTEM['LENGTH'] / units.SYSTEM['TIME'] self.dtype = self._get_dtype(bits_per_number) # this sets the data type to np.float64 # The underlying assumption is that the number of atoms, # and the atom types, stay CONSTANT within the trajectory, # and so we define them as header data, and not separately for every frame: self.n_atoms = n_atoms self.n_steps = n_steps self.atom_types = [] # atom types defined as numbers, following the MD engine definition self.atom_masses = [] # atom masses, floating point numbers, one for each atom self.atom_charges = [] # atom charges, floating point numbers, one for each atom # The initial value of self.dimensions is set to a number too low to be physical, # but different from 0. This way if an observable tried to calculate the Q vectors # or, more precisely, reciprocal space vectors # from an unpopulated CompactTrajectory, it would not divide by zero. self.dimensions = 0.1*np.ones(3) # avoids divide-by-zero errors, explained above. self.changing_dimensions = None self.element_list = [] # chemical element (str) defined for each atom self.element_set = set() # set of chemical elements (str) present in the trajectory # key point: the data! # this is where we will keep the numpy arrays # vvvvvvvvvvvvvvvvvv self.position = None self.velocity = None self.time = None # now some state indicators: self.is_allocated = False # preAllocate has been run, numpy arrays exist self.is_populated = False # numpy arrays are not empty; some data have been written self.is_fixedbox = True # the dimensions of the simulation box are fixed; self.has_velocity = useVelocity # ^^^^^^^^^^^^^^ # the self.is_fixedbox could potentially be False for an NPT ensemble # in that case self.changing_dimensions will contain the box dimensions for each step. self.first_index = 1e5 # the first array index at which we have written data self.last_index = 0 # the last array index at which we have written data # ^^^^^^^^^^^^^ # since we use np.empty, the elements of the array are _not_ initialised to any value, # so it is a precaution to slice the arrays at the end to cut off the empty parts. # ------------- # now we allocate the arrays # vvvvvvvvvvvvv if n_steps > 0: self.preAllocate(n_steps=n_steps, n_atoms=n_atoms, useVelocity=useVelocity) try: self.universe = settings['universe'] except KeyError: self.universe = None @staticmethod def _get_dtype(bpn: int): if bpn > 8: return np.float128 if bpn > 4: return np.float64 if bpn > 2: return np.float32 return np.float16
[docs] def setBytesPerNumber(self, bytes_per_number: int = 8): """ Changes the number of bytes per number in the arrays storing atom positions, velocities, the frame timestamps and simulation box dimensions. The best approach is to set the correct value before populating the arrays, but it is still possible to change the data type using this function when the CompactTrajectory already contains some numbers. Parameters ---------- bytes_per_number (int, optional): If 8, the arrays will use np.float64 to store the positions, velocities and time at each step. Will always be rounded up to the nearest multiple of 2.""" self.dtype = self._get_dtype(bytes_per_number) if len(self) > 0: self.time = self.time.astype(self.dtype) self.position = self.position.astype(self.dtype) if self.velocity is not None: self.velocity = self.velocity.astype(self.dtype) self.changing_dimensions = self.changing_dimensions.astype( self.dtype)
def __len__(self): if self.position is None: return 0 # Since it is not guaranteed that the postProcess method has been run, # we explicitly limit the indices to those which have been written into. # The length corresponds to the number of simulation steps. return len(self.position[self.first_index:self.last_index + 1]) def __getitem__(self, index: Union[int, slice]): # different behaviour: # a single index extracts a subrtajectory # with length 1, # while a slice with produce a subtrajectory, # which is another CompactTrajectory. try: start, stop, step = index.start, index.stop, index.step except AttributeError as exc: if index >= self.__len__(): raise IndexError("Trying to access a nonexistent time" " frame in the CompactTrajectory.") from exc return self.subtrajectory(index, index+1, 1) return self.subtrajectory(start, stop, step) def __eq__(self, other: 'CompactTrajectory') -> bool: if not self.is_allocated == other.is_allocated: return False if not self.is_populated == other.is_populated: return False if not self.is_fixedbox == other.is_fixedbox: return False if not self.has_velocity == other.has_velocity: return False are_the_same = (other.position_unit == self.position_unit and other.time_unit == self.time_unit and other.velocity_unit == self.velocity_unit and other.dtype == self.dtype) if are_the_same: are_the_same = np.all([ self.n_atoms == other.n_atoms, self.first_index == other.first_index, self.last_index == other.last_index]) else: print("Failed header comparison") if are_the_same: are_the_same = np.all([ np.allclose(other.dimensions.shape, self.dimensions.shape), np.allclose(other.position.shape, self.position.shape), np.allclose(other.time.shape, self.time.shape), np.allclose(other.changing_dimensions.shape, self.changing_dimensions.shape)]) else: print("Failed index comparison") if are_the_same: are_the_same = self.universe == other.universe else: print("Failed shape comparison") print(f"Dimensions other: {other.dimensions.shape}, self: {self.dimensions.shape}") print(f"Position other: {other.position.shape}, self: {self.position.shape}") print(f"Time other: {other.time.shape}, self: {self.time.shape}") print(f"CDim other: {other.changing_dimensions.shape}" f"self: {self.changing_dimensions.shape}") if are_the_same: if self.is_fixedbox: are_the_same = np.all([ np.allclose(other.dimensions, self.dimensions), np.allclose(other.position, self.position), np.allclose(other.time, self.time)]) else: are_the_same = np.all([ np.allclose(other.dimensions, self.dimensions), np.allclose(other.position, self.position), np.allclose(other.time, self.time), np.allclose(other.changing_dimensions, self.changing_dimensions)]) else: print("Failed universe comparison") if are_the_same: if self.velocity is not None and other.velocity is not None: are_the_same = np.allclose(self.velocity, other.velocity) else: print("Failed array comparison") if are_the_same: are_the_same = np.all([ np.all(other.atom_types == self.atom_types), other.n_atoms == self.n_atoms, np.allclose(other.atom_charges, self.atom_charges), np.allclose(other.atom_masses, self.atom_masses), other.element_list == self.element_list, other.element_set == self.element_set]) else: print("Failed velocity comparison") if not are_the_same: print(f"atom_types: {np.all(other.atom_types == self.atom_types)}") print(f"n_atoms: {other.n_atoms == self.n_atoms}") print(f"atom_charges: {np.allclose(other.atom_charges, self.atom_charges)}") print(f"atom_masses: {np.allclose(other.atom_masses, self.atom_masses)}") print(f"element_list: {other.element_list == self.element_list}") print(f"element_set: {other.element_set == self.element_set}") return are_the_same @property def velocities(self): """ Returns the velocity array. Added for those parts of code that use the plural form instead of singular. """ return self.velocity @property def times(self): """ Returns the time array. Added for those parts of code that use the plural form instead of singular. """ return self.time @property def positions(self): """ Returns the position array. Added for those parts of code that use the plural form instead of singular. """ return self.position @property def data(self): """ Returns the step numbers and time steps in a single array. """ return np.column_stack([ np.arange(self.n_steps), self.time, # there used to be a TemporalConfiguration here as well. ])
[docs] def preAllocate(self, n_steps: int = 1, n_atoms: int = 1, useVelocity: bool = False): """ Creates empty arrays for storing atom positions, velocities, time values of the simulation frames, and the simulation box dimensions. The numpy empty object do not initialise elements until they have been accessed by a read or write operation. This means that allocating too many steps is generally safe and harmless, as the unused steps will be removed when the postProcess method is called. Parameters ---------- n_steps (int, optional): Number of simulation steps in the trajectory. Defaults to 1. n_atoms (int, optional): Number of atoms in the system. Defaults to 1. useVelocity (bool, optional): If the trajectory contains velocities, set to True to allocate an additional array for the velocity values. Defaults to False. """ if self.is_allocated: print("WARNING: preAllocate has already been run on this CompactTrajectory.") self.n_atoms = n_atoms # from now on we decide that each step will have n_atoms self.n_steps = n_steps # and there will be n_steps in this trajectory. # time steps are the first dimension of the arrays. shape = (n_steps, n_atoms, 3) self.time = np.empty(n_steps, dtype=self.dtype) self.position = np.empty(shape, dtype=self.dtype) if useVelocity: self.velocity = np.empty(shape, dtype=self.dtype) # First change of state: the arrays have been allocated. self.is_allocated = True self.changing_dimensions = np.empty((n_steps, 3), dtype=self.dtype)
# ^^^^^^^^^^^^^^^^^^^^^^ # We are fairly confident that the dimensions of the simulation box will NOT change # during the simulation, but allocating this array does not really cost us anything, # and, if it turns out that the dimensions do change, we will be prepared.
[docs] def setCharge(self, charge_list: List[float] = None): """ Sets the values of partial charge for each atom. It assumes that the charges are constant within the simulation. Parameters ---------- charge_list : list[float] Fractional electrical charge of each atom, given in a list with one floating point number per atom. """ # If need be, we can modify it to work the same # as setDimensions, so it populates a time-dependent # array if it turns out that the charge values # change between the time frames. if charge_list is None: return False if not len(charge_list) == self.n_atoms: raise ValueError("Wrong number of charges in setCharge.") self.atom_charges = np.array(charge_list) return True
[docs] def setDimensions(self, frame_dimensions: np.array = None, step_num: int = -1): """ Writes the simulation box dimensions into the object header. Additionally, if the simulation box dimensions change with time, it will keep track of the new dimensions at each step in the self.changing_dimensions object. Parameters --------- frame_dimensions : np.array 3 float numbers defining the size of the simulation box along x, y and z. step_num : int The number of the simulation frame at which the frame_dimensions array was read. """ # The initial self.dimensions is set to 0.1 Angstrom. # This is too small to ever be used in a simulation, and at the same time # it is not 0. # Here we check if the initial values has already been overwritten with # a physical value of dimensions. if np.all(np.abs(self.dimensions - 0.1) < 1e-5): self.dimensions = frame_dimensions self.changing_dimensions[0] = frame_dimensions elif np.allclose(frame_dimensions, self.dimensions, rtol=1e-6, atol=1e-4): # If the new dimensions are the same as the current ones, we do nothing. pass else: self.is_fixedbox = False # the dimensions of the box DO change in this trajectory! # we save the dimensions self.changing_dimensions[step_num] = frame_dimensions # per simulation step now. self.dimensions = self.changing_dimensions[self.first_index:self.last_index+1].mean( 0)
# ^^^^^^^^^^^^^ # now that we have discovered that the dimensions change, # we use the mean value over time as the dimensions parameter.
[docs] def writeOneStep(self, step_num: int = -1, time: float = -1.0, positions: np.array = None, velocities: np.array = None): """ Writes the atom positions (and, optionally, velocities) of a single simulation frame into the trajectory arrays. This function assumes that we have allocated the memory already, and that we have sorted the atoms according to their IDs. It will then put the numbers into the arrays at the correct frame, given by the step_num parameter. Parameters ---------- step_num : int, optional the index at which the numbers will be written. Defaults to -1. time : float the time stamp of the simulation step, in the correct time units (femtoseconds). Defaults to -1.0. positions : np.array the array of the atom positions, shaped (n_atoms, 3). Defaults to None. velocities : np.array, optional the array of the atom velocities, shaped (n_atoms, 3). If we don't use velocities, it can be skipped. Defaults to None. """ if not self.is_allocated: raise IndexError("Writing outside of the reserved array range.") self.time[step_num] = time self.position[step_num, :, :] = positions if velocities is not None: self.velocity[step_num, :, :] = velocities # some housekeeping: # we take note of the indices that have been written to. # In case we allocated more memory than needed, # we keep track of the indices where the data has been written, # so we can discard the unused part later, # as the uninitalised part of the array will contain random numbers. self.first_index = min(step_num, self.first_index) self.last_index = max(step_num, self.last_index)
[docs] def writeEmptyStep(self, step_num: int = -1, time: float = -1.0): """ This function advances the iterators without writing any data. It is basically a reduced version of the writeOneStep method. This was added since the tests/trajectory_analysis/test_PDF.py use a trajectory made of 1000 _empty_ configurations. Parameters ---------- step_num : int, optional the index at which the numbers will be written. Defaults to -1. time : float, optional the time stamp of the simulation step, in the correct time units (femtoseconds). Defaults to -1.0. """ if not self.is_allocated: raise IndexError("Writing outside of the reserved array range.") self.time[step_num] = time self.first_index = min(step_num, self.first_index) self.last_index = max(step_num, self.last_index)
def _create_types_from_elements(self, atom_types: List[str]): """ Some engines (e.g. LAMMPS) like using numbers at atom types, while others use letters. CompactTrajectory was written for LAMMPS initially, and likes having numbers for types. This function will check if the types are numbers, and replace them with consitent positive numbers if it turns out that they were not. """ if len(atom_types) == 0: return np.array([]) try: int(atom_types[0]) except ValueError: all_types = np.unique(atom_types) compare_types = np.array(atom_types, dtype=str) number_types = np.zeros(len(atom_types), dtype=np.int64) for number, at_type in enumerate(all_types): number_types[np.where(compare_types == at_type)] = number +1 return number_types return np.array(atom_types)
[docs] def validateTypes(self, raw_atom_types: np.array): """ This function either sets the array of atom types (if it had not been set before), or checks if the array of atom types from the new frame is the same as the original array of atom types. If the atom types have changed during the simulation, we cannot process the results using the CompactTrajectory object, and the validation will return False. It is up to the engine facade to decide what to do if valideTypes returns False. Parameters ---------- atom_types : np.array an array of all the atom types, sorted by the atom ID. If these are numbers, like in a LAMMPS simulation, then they will be stored directly. If raw_atom_types are strings, an internal method called _create_types_from_elements will create numbers that correspond to the strings, so that every atom type has its own number. Returns ------- bool True if the atom types are the same as in the beginning, False otherwise. """ # codes like LAMMPS make it possible to generate or destroy atoms/particles # in a simulation to simulate flow. It is unlikely to happen in an MDMC run. # Since we cannot handle such a case, # we will return False to indicate that this trajectory is not suitable for MDMC. atom_types = self._create_types_from_elements(raw_atom_types) # ^^^^^^^^ # This ensures that the atom types are numers. if len(self.atom_types) == 0: # case 1: atom_types have not been set if len(atom_types) == self.n_atoms: self.atom_types = atom_types # and now they are set. return True elif np.all(self.atom_types == atom_types): # case 2: atom_types have been set return True # and have not changed return False # case 3: atom_types have been set and have changed.
[docs] def labelAtoms(self, atom_symbols: dict = None, atom_masses: dict = None): """ Creates a list of chemical elements of all the atoms in a trajectory frame, a set of all the chemical elements present in a trajectory, and a list of atom masses of all the atoms in a trajectory frame. It is up to the engine facade to construct the input dictionaries containing the correct information about the chemical elements and masses. Parameters ---------- atom_symbols : dict a dictionary which returns a 'str' chemical element symbol for every 'int' atom_ID from the trajectory. atom_masses : dict a dictionary of 'float' atom masses, using as keys the 'int' atom_ID values. """ if len(self.atom_types) == 0: return False # tests/trajectory_analysis/test_PDF.py use empty Configurations # so we accept a case of no atoms in the CompactTrajectory. # We just return False in case we wanted to check in real code if # we are trying to set labels on a CompactTrajectory with no atoms. self.element_list = [str(atom_symbols[atom_id]) for atom_id in self.atom_types] self.element_set = set(self.element_list) self.atom_masses = np.array([atom_masses[atom_id] for atom_id in self.atom_types]) return True
[docs] def postProcess(self): """ This function should be called after the all the trajectory steps have been read. It will discard the unnecessary rows of the arrays, in case we had allocated too many due to some rounding error. """ if not self.is_populated: self.position = self.position[self.first_index:self.last_index+1] self.time = self.time[self.first_index:self.last_index+1] if self.velocity is not None: self.velocity = self.velocity[self.first_index:self.last_index+1] self.changing_dimensions = self.changing_dimensions[self.first_index:self.last_index+1] self.first_index = 0 self.last_index = len(self.position)-1 self.is_populated = True
[docs] def subtrajectory(self, start: int = 0, stop: int = -1, step: int = 1, atom_filter: List = None): """ Returns another CompactTrajectory instance, which contains the same header information, and a subset of the original trajectory steps. The arrays in the original trajectory will be sliced following the pattern: new = old[start:stop:step]. Optionally, atom_filter can be specified to choose only specific atoms from the trajectory. It is recommended not to use subtrajectory directly in this case, but to use the filter_by_element or filter_by_type methods, which will create the element_filter themselves. Parameters ---------- start : int number defining the beginning of the slicing range. stop : int number defining the end of the slicing range. step : int step size of the slicing operation Returns ------- CompactTrajectory: a trajectory containing the same header and the same or less steps than the original. """ self.postProcess() temp = CompactTrajectory() # copy over all the transferable parts temp.position_unit = self.position_unit temp.time_unit = self.time_unit temp.velocity_unit = self.velocity_unit temp.dtype = self.dtype temp.dimensions = self.dimensions temp.universe = self.universe if atom_filter is None: temp.position = self.position[start:stop:step, :, :] temp.time = self.time[start:stop:step] temp.changing_dimensions = self.changing_dimensions[start:stop:step, :] if self.velocity is not None: temp.velocity = self.velocity[start:stop:step, :, :] temp.n_atoms = self.n_atoms temp.atom_types = self.atom_types.copy() temp.atom_masses = self.atom_masses.copy() temp.atom_charges = self.atom_charges.copy() temp.element_list = self.element_list temp.element_set = self.element_set else: temp.position = self.position[start:stop:step, atom_filter, :] temp.time = self.time[start:stop:step] temp.changing_dimensions = self.changing_dimensions[start:stop:step, :] if self.velocity is not None: temp.velocity = self.velocity[start:stop:step, atom_filter, :] temp.atom_types = self.atom_types[atom_filter] temp.n_atoms = len(temp.atom_types) temp.atom_charges = self.atom_charges[atom_filter] temp.atom_masses = self.atom_masses[atom_filter] temp.element_list = [self.element_list[x] for x in atom_filter] temp.element_set = set(temp.element_list) temp.is_allocated = True temp.is_populated = True temp.has_velocity = self.has_velocity temp.is_fixedbox = self.is_fixedbox temp.first_index = 0 temp.last_index = len(temp.position)-1 temp.n_steps = len(temp.position) temp.postProcess() return temp
[docs] def filter_by_time(self, start, end=None): """ Create another CompactTrajectory, containing only the frames with the time values equal to or larger than the start parameter, and smaller than the end parameter. If only one time value is given, the function will create a CompactTrajectory with a single time step equal to start, or raise and error if start is not an element of the time array. Parameters ---------- start : float The start time for filtering the ``Trajectory`` end : , optional The end time for filtering the ``Trajectory``. The default is `None`, which means the new returned ``Trajectory`` has a single time, defined by the ``start`` Returns ------- CompactTrajectory A ``CompactTrajectory`` with ``times`` in half open interval defined by ``start`` and ``end`` """ index = np.where(self.time == start)[0].ravel() if end is None: if len(index) < 1: raise ValueError("Start is not in self.time") return self.subtrajectory(index[0], index[0]+1) total = np.where(np.logical_and( self.time >= start, self.time < end))[0].ravel() if len(total) < 1: raise ValueError("The specified time range contains no MD frames") return self.subtrajectory(index[0], len(total))
[docs] def filter_by_element(self, elements: List[str]): """ Create a subtrajectory out of the original CompactTrajectory, containing only the chemical elements specified in the input element list. Parameters ---------- elements : list(str) List of chemical elements given as string. Returns ------- CompactTrajectory A ``CompactTrajectory`` containing only the atoms of the specified chemical elements. """ indices = [] for element in elements: if element in self.element_list: index = np.where(np.array(self.element_list) == element)[0].ravel() indices.append(index) index = np.concatenate(indices) index = np.sort(index) return self.subtrajectory(0, len(self), step = 1, atom_filter = index)
[docs] def filter_by_type(self, types: List[int]): """ Create a subtrajectory out of the original CompactTrajectory, containing only the atoms with the atom_type specified in the input types list. Parameters ---------- types : List(int) A list of atom_type numbers. Returns ------- CompactTrajectory A ``CompactTrajectory`` containing only the atoms of the specified type. """ indices = [] for atom_type in types: if atom_type in self.atom_types: index = np.where(self.atom_types == atom_type)[0].ravel() indices.append(index) index = np.concatenate(indices) index = np.sort(index) return self.subtrajectory(0, len(self), step = 1, atom_filter = index)
[docs] def exportAtom(self, step_number: int = 0, atom_number: int = 0): """ For compatibility with ``Trajectory``, creates an ``Atom`` object for a chosen time step of the simulation and atom number. Parameters ---------- step_number : int number of the simulation step at which the atom position should be read atom_number : int number of the atom in the trajectory that will be created as an ``Atom`` object Returns ------- Atom a single ``Atom`` object, for whatever purpose the user may need it. """ try: element = self.element_list[atom_number] except AttributeError: element = '?' try: velocity = self.velocity[step_number, atom_number, :] except AttributeError: velocity = (0.0, 0.0, 0.0) try: charge = self.atom_charges[atom_number] except IndexError: charge = 0.0 return Atom(element, self.position[step_number, atom_number, :], velocity, charge=charge)
[docs] def configurations_as_compact_trajectory(*configs: List[TemporalConfiguration])->CompactTrajectory: """ Populate the arrays of the CompactTrajectory using the input list of ``TemporalConfiguration`` objects. Arguments --------- configs -- A list of TemporalConfiguration objects, most likely containing atoms Returns ------- A CompactTrajectory with atom types and positions matching those in the input configurations. """ if len(configs) < 1: raise TypeError("At least one Configuration is needed" " for the CompactTrajectory.fromConfigs()") traj = CompactTrajectory(n_steps=len(configs), n_atoms=len(configs[0].atoms), useVelocity=len(configs[0].atom_velocities) > 0, universe = configs[0].universe) for step_number, config in enumerate(configs): try: current_time = config.time except AttributeError: current_time = 0.0 if len(config.data) > 0: atpos = np.vstack(config.atom_positions) atvel = np.vstack(config.atom_velocities) traj.writeOneStep(step_num=step_number, time=current_time, positions=atpos, velocities=atvel) else: traj.writeEmptyStep(step_num=step_number, time=current_time) try: dim = config.universe.dimensions except AttributeError: continue else: traj.setDimensions(dim, step_num=step_number) # here we extract as much header information as possible # from the TemporalConfiguration elements = [] # list of 'chemical_element_symbol', 1 per atom # dictionary of 'chemical_element_symbol' : mass (float) in a.m.u. masses = {} # dictionary of 'atom_type' : mass (float) in a.m.u. mass_per_type = {} element_per_type = {} # dictionary of 'atom_type' : 'chemical_element_symbol' types = [] # a list of 'atom_type', 1 entry for each atom id_values = [] # a list of 'atom_ID', 1 entry for each atom atom_charge = [] # we assume -for now- that the Trajectory stores atom_type. has_types = True atom_counter = 0 # we just iterate over Atom objects for nat, atom in enumerate(configs[0].atoms): element = atom.element.symbol mass = atom.mass charge = atom.charge elements.append(element) atom_charge.append(charge) masses[element] = mass if has_types: try: types.append(atom.atom_type) except AttributeError: has_types = False id_values.append(atom.ID) atom_counter = nat + 1 if not has_types: all_elements = sorted(list(np.unique(elements))) types = [all_elements.index(x) for x in elements] for x in range(atom_counter): mass_per_type[types[x]] = masses[elements[x]] element_per_type[types[x]] = elements[x] traj.validateTypes(np.array(types)[np.argsort(id_values)]) traj.labelAtoms(element_per_type, mass_per_type) traj.setCharge(atom_charge) traj.postProcess() return traj