Source code for dynasor.trajectory.trajectory

__all__ = ['Trajectory', 'WindowIterator']

from collections import deque
from itertools import islice, chain
from os.path import isfile
from typing import Callable, Optional, Union

import numpy as np
from numpy.typing import NDArray

from dynasor.trajectory.atomic_indices import parse_gromacs_index_file
from dynasor.trajectory.ase_trajectory_reader import ASETrajectoryReader
from dynasor.trajectory.extxyz_trajectory_reader import ExtxyzTrajectoryReader
from dynasor.trajectory.lammps_trajectory_reader import LammpsTrajectoryReader
from dynasor.trajectory.mdanalysis_trajectory_reader import MDAnalysisTrajectoryReader
from dynasor.trajectory.trajectory_frame import TrajectoryFrame
from dynasor.logging_tools import logger


[docs] class Trajectory: """Instances of this class hold trajectories in a format suitable for the computation of correlation functions. They behave as iterators, where each step returns the next frame as a :class:`TrajectoryFrame` object. The latter hold information regarding atomic positions, types, and velocities. Parameters ---------- filename Name of input file. trajectory_format Type of trajectory. Possible values are: ``'lammps_internal'``, ``'extxyz'``, ``'ase'`` or one of the formats supported by `MDAnalysis <https://www.mdanalysis.org/>`_ (except for ``'lammpsdump'``, which can be called by specifying ``'lammps_mdanalysis'`` to avoid ambiguity) atomic_indices Specify which indices belong to which atom type. Can be (1) a dictionary where the keys specify the species and the values are a list of atomic indices, (2) ``'read_from_trajectory'``, in which case the species are read from the trajectory or (3) the path to a gromacs index file. length_unit Length unit of trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). Necessary for correct conversion to internal dynasor units if the trajectory file does not contain unit information. If no length unit is specified and the reader cannot read units from the trajectory, Angstrom is assumed. time_unit Time unit of trajectory (``'fs'``, ``'ps'``, ``'ns'``). Necessary for correct conversion to internal dynasor units if the trajectory file does not contain unit information. If no time unit is specified and the reader cannot read units from the trajectory, fs is assumed. frame_start First frame to read; must be larger or equal ``0``. frame_stop Last frame to read. By default (``None``) the entire trajectory is read. frame_step Read every :attr:`frame_step`-th step of the input trajectory. By default (``1``) every frame is read. Must be larger than ``0``. """ def __init__( self, filename: str, trajectory_format: str, atomic_indices: Optional[Union[str, dict[str, list[int]]]] = None, length_unit: Optional[str] = None, time_unit: Optional[str] = None, frame_start: Optional[int] = 0, frame_stop: Optional[int] = None, frame_step: Optional[int] = 1 ): if frame_start < 0: raise ValueError('frame_start should be positive') if frame_step < 0: raise ValueError('frame_step should be positive') self._frame_start = frame_start self._frame_step = frame_step self._frame_stop = frame_stop # setup trajectory reader if not isfile(filename): raise IOError(f'File {filename} does not exist') self._filename = filename if trajectory_format == 'lammps_internal': reader = LammpsTrajectoryReader elif trajectory_format == 'extxyz': reader = ExtxyzTrajectoryReader elif trajectory_format == 'lammps_mdanalysis': reader = MDAnalysisTrajectoryReader trajectory_format = 'lammpsdump' elif trajectory_format == 'ase': reader = ASETrajectoryReader elif trajectory_format == 'lammps': raise IOError('Ambiguous trajectory format, ' 'did you mean lammps_internal or lammps_mdanalysis?') else: reader = MDAnalysisTrajectoryReader logger.debug(f'Using trajectory reader: {reader.__name__}') if reader == MDAnalysisTrajectoryReader: self._reader_obj = reader(self._filename, trajectory_format, length_unit=length_unit, time_unit=time_unit) else: self._reader_obj = reader(self._filename, length_unit=length_unit, time_unit=time_unit) # Get two frames to set cell etc. frame0 = next(self._reader_obj) frame1 = next(self._reader_obj) self._cell = frame0.cell self._n_atoms = frame0.n_atoms # Make sure cell is not changed during consecutive frames if not np.allclose(frame0.cell, frame1.cell): raise ValueError('The cell changes between the first and second frame. ' 'The concept of q-points becomes muddy if the simulation cell is ' 'changing, such as during NPT MD simulations, so trajectories where ' 'the cell changes are not supported by dynasor.') # setup iterator slice (reuse frame0 and frame1 via chain) self.number_of_frames_read = 0 self.current_frame_index = 0 self._reader_iter = islice(chain([frame0, frame1], self._reader_obj), self._frame_start, self._frame_stop, self._frame_step) # setup atomic indices if atomic_indices is None: # Default behaviour atomic_indices = {'X': np.arange(0, self.n_atoms)} elif isinstance(atomic_indices, str): # Str input if atomic_indices == 'read_from_trajectory': if frame0.atom_types is None: raise ValueError('Could not read atomic indices from the trajectory.') else: uniques = np.unique(frame0.atom_types) atomic_indices = {str(uniques[i]): (frame0.atom_types == uniques[i]).nonzero()[0] for i in range(len(uniques))} else: atomic_indices = parse_gromacs_index_file(atomic_indices) elif isinstance(atomic_indices, dict): # dict input pass else: raise ValueError('Could not understand atomic_indices.') self._atomic_indices = atomic_indices # sanity checks for atomic_indices for key, indices in self._atomic_indices.items(): if np.max(indices) > self.n_atoms: raise ValueError('maximum index in atomic_indices exceeds number of atoms') if np.min(indices) < 0: raise ValueError('minimum index in atomic_indices is negative') if '_' in key: # Since '_' is what we use to distinguish atom types in the results, e.g. Sqw_Cs_Pb raise ValueError('The char "_" is not allowed in atomic_indices.') # log info on trajectory and atom types etc logger.info(f'Trajectory file: {self.filename}') logger.info(f'Total number of particles: {self.n_atoms}') logger.info(f'Number of atom types: {len(self.atom_types)}') for atom_type, indices in self._atomic_indices.items(): logger.info(f'Number of atoms of type {atom_type}: {len(indices)}') logger.info(f'Simulation cell (in Angstrom):\n{str(self._cell)}') def __iter__(self): return self def __next__(self): frame = next(self._reader_iter) logger.debug(f'Read frame #{frame.frame_index}') new_frame = TrajectoryFrame( self.atomic_indices, frame.frame_index, frame.positions, frame.velocities) self.number_of_frames_read += 1 self.current_frame_index = frame.frame_index return new_frame def __str__(self) -> str: s = ['Trajectory'] s += ['{:12} : {}'.format('filename', self.filename)] s += ['{:12} : {}'.format('natoms', self.n_atoms)] s += ['{:12} : {}'.format('frame_start', self._frame_start)] s += ['{:12} : {}'.format('frame_stop', self._frame_stop)] s += ['{:12} : {}'.format('frame_step', self.frame_step)] s += ['{:12} : {}'.format('frame_index', self.current_frame_index)] s += ['{:12} : [{}\n {}\n {}]' .format('cell', self.cell[0], self.cell[1], self.cell[2])] return '\n'.join(s) def __repr__(self) -> str: return str(self) def _repr_html_(self) -> str: s = [f'<h3>{self.__class__.__name__}</h3>'] s += ['<table border="1" class="dataframe">'] s += ['<thead><tr><th style="text-align: left;">Field</th><th>Value</th></tr></thead>'] s += ['<tbody>'] s += [f'<tr"><td style="text-align: left;">File name</td><td>{self.filename}</td></tr>'] s += [f'<tr><td style="text-align: left;">Number of atoms</td><td>{self.n_atoms}</td></tr>'] s += [f'<tr><td style="text-align: left;">Cell metric</td><td>{self.cell}</td></tr>'] s += [f'<tr><td style="text-align: left;">Frame step</td><td>{self.frame_step}</td></tr>'] s += [f'<tr><td style="text-align: left;">Atom types</td><td>{self.atom_types}</td></tr>'] s += ['</tbody>'] s += ['</table>'] return '\n'.join(s) @property def cell(self) -> NDArray[float]: """ Simulation cell """ return self._cell @property def n_atoms(self) -> int: """ Number of atoms """ return self._n_atoms @property def filename(self) -> str: """ The trajectory filename """ return self._filename @property def atomic_indices(self) -> dict[str, list[int]]: """ Return copy of index arrays """ atomic_indices = dict() for name, inds in self._atomic_indices.items(): atomic_indices[name] = inds.copy() return atomic_indices @property def atom_types(self) -> list[str]: return sorted(self._atomic_indices.keys()) @property def frame_step(self) -> int: """ Frame to access, trajectory will return every :attr:`frame_step`-th snapshot. """ return self._frame_step
def consume(iterator, n): """ Advance the iterator by :attr:`n` steps. If :attr:`n` is ``None``, consume entirely. """ # From python.org if n is None: deque(iterator, maxlen=0) else: next(islice(iterator, n, n), None) class WindowIterator: """Sliding window iterator. Returns consecutive windows (a window is represented as a list of objects), created from an input iterator. Parameters ---------- itraj Trajectory object. width Length of window (``window_size`` + 1). window_step Distance between the start of two consecutive window frames. element_processor Enables processing each non-discarded object; useful if ``window_step > width`` and ``map_item`` is expensive (as compared to directly passing ``map(fun, itraj)`` as ``itraj``); if ``window_step < width``, you could as well directly pass ``map(fun, itraj)``. """ def __init__(self, itraj: Trajectory, width: int, window_step: Optional[int] = 1, element_processor: Optional[Callable] = None): self._raw_it = itraj if element_processor: self._it = map(element_processor, self._raw_it) else: self._it = self._raw_it assert window_step >= 1 assert width >= 1 self.width = width self.window_step = window_step self._window = None def __iter__(self): return self def __next__(self): """ Returns next element in sequence. """ if self._window is None: self._window = deque(islice(self._it, self.width), self.width) else: if self.window_step >= self.width: self._window.clear() consume(self._raw_it, self.window_step - self.width) else: for _ in range(min((self.window_step, len(self._window)))): self._window.popleft() for f in islice(self._it, min((self.window_step, self.width))): self._window.append(f) if len(self._window) == 0: raise StopIteration return list(self._window)