__all__ = ['Trajectory', 'WindowIterator']
import numpy as np
from collections import deque
from itertools import islice, chain
from os.path import isfile
from typing import Callable, Dict, Union, List
from dynasor.trajectory.atomic_indices import parse_gromacs_index_file
from dynasor.trajectory.ase_trajectory_reader import ASETrajectoryReader
from dynasor.trajectory.extxyz_trajectory_reader import ExtxyzTrajectoryReader
from dynasor.trajectory.lammps_trajectory_reader import LammpsTrajectoryReader
from dynasor.trajectory.mdanalysis_trajectory_reader import MDAnalysisTrajectoryReader
from dynasor.trajectory.trajectory_frame import TrajectoryFrame
from dynasor.logging_tools import logger
[docs]class Trajectory:
"""Instances of this class hold trajectories in a format suitable for
the computation of correlation functions. They behave as
iterators, where each step returns the next frame as a
:class:`TrajectoryFrame` object. The latter hold information
regarding atomic positions, types, and velocities.
Name of input file
Type of trajectory. Possible values are:
``'lammps_internal'``, ``'extxyz'`` or one of the formats supported by
`MDAnalysis <https://www.mdanalysis.org/>`_ (except for ``'lammpsdump'``,
which can be called by specifying ``'lammps_mdanalysis'`` to avoid ambiguity)
Specify which indices belong to which atom type. Can be
(1) a dictionary where the keys specicy species and the values are list of atomic indices,
(2) ``'read_from_trajectory'``, in which case the species are read from the trajectory or
(3) the path to an index file.
Length unit of trajectory. Necessary for correct conversion to internal dynasor units if
the trajectory file does not contain unit information. For available options see
`MDAnalysis <https://docs.mdanalysis.org/stable/documentation_pages/units.html>`__.
Time unit of trajectory. Necessary for correct conversion to internal dynasor units if
the trajectory file does not contain unit information. For available options see
`MDAnalysis <https://docs.mdanalysis.org/stable/documentation_pages/units.html>`__.
First frame to read; must be larger or equal ``0``.
Last frame to read. By default (``None``) the entire trajectory is read.
Read every :attr:`frame_step`-th step of the input trajectory.
By default (``1``) every frame is read. Must be larger than ``0``.
def __init__(
filename: str,
trajectory_format: str,
atomic_indices: Union[str, Dict[str, List[int]]] = None,
length_unit: str = None,
time_unit: str = None,
frame_start: int = 0,
frame_stop: int = None,
frame_step: int = 1
if frame_start < 0:
raise ValueError('frame_start should be positive')
if frame_step < 0:
raise ValueError('frame_step should be positive')
self._frame_number = frame_start
self._frame_step = frame_step
self._frame_stop = frame_stop
# setup trajectory reader
if not isfile(filename):
raise IOError(f'File {filename} does not exist')
self._filename = filename
if trajectory_format == 'lammps_internal':
reader = LammpsTrajectoryReader
elif trajectory_format == 'extxyz':
reader = ExtxyzTrajectoryReader
elif trajectory_format == 'lammps_mdanalysis':
reader = MDAnalysisTrajectoryReader
trajectory_format = 'lammpsdump'
elif trajectory_format == 'ase':
reader = ASETrajectoryReader
elif trajectory_format == 'lammps':
raise IOError('Ambiguous trajectory format, '
'did you mean lammps_internal or lammps_mdanalysis?')
reader = MDAnalysisTrajectoryReader
logger.debug(f'Using trajectory reader: {reader.__name__}')
if reader == MDAnalysisTrajectoryReader:
self._reader_obj = reader(self._filename, trajectory_format,
length_unit=length_unit, time_unit=time_unit)
self._reader_obj = reader(self._filename, length_unit=length_unit, time_unit=time_unit)
# Get two frames to set cell etc.
frame0 = next(self._reader_obj)
frame1 = next(self._reader_obj)
self._cell = frame0.cell
self._n_atoms = frame0.n_atoms
# Make sure cell is not changed during consecutive frames
if not np.allclose(frame0.cell, frame1.cell):
raise ValueError('The cell changes between the first and second frame. '
'The concept of q-points becomes muddy if the simulation cell is '
'changing, such as during NPT MD simulations, so trajectories where '
'the cell changes are not supported by dynasor.')
# setup iterator slice (reuse frame0 and frame1 via chain)
self._reader_obj = islice(chain([frame0, frame1], self._reader_obj),
frame_start, self._frame_stop, self._frame_step)
# setup atomic indices
if atomic_indices is None: # Default behaviour
atomic_indices = {'X': np.arange(0, self.n_atoms)}
elif isinstance(atomic_indices, str): # Str input
if atomic_indices == 'read_from_trajectory':
if frame0.atom_types is None:
raise ValueError('Could not read atomic indices from the trajectory.')
uniques = np.unique(frame0.atom_types)
atomic_indices = {uniques[i]: (frame0.atom_types == uniques[i]).nonzero()[0]
for i in range(len(uniques))}
atomic_indices = parse_gromacs_index_file(atomic_indices)
elif isinstance(atomic_indices, dict): # Dict input
raise ValueError('Could not understand atomic_indices.')
self._atomic_indices = atomic_indices
# sanity checks for atomic_indices
for key, indices in self._atomic_indices.items():
if np.max(indices) > self.n_atoms:
raise ValueError('maximum index in atomic_indices exceeds number of atoms')
if np.min(indices) < 0:
raise ValueError('minimum index in atomic_indices is negative')
if '_' in key:
# Since '_' is what we use to distinguish atom types in the results, e.g. Sqw_Cs_Pb
raise ValueError('The char "_" is not allowed in atomic_indices.')
# log info on trajectory and atom types etc
logger.info(f'Trajectory file: {self.filename}')
logger.info(f'Total number of particles: {self.n_atoms}')
logger.info(f'Number of atom types: {len(self.atom_types)}')
for atom_type, indices in self._atomic_indices.items():
logger.info(f'Number of atoms of type {atom_type}: {len(indices)}')
logger.info(f'Simulation cell (in Angstrom):\n{str(self._cell)}')
def __iter__(self):
return self
def __next__(self):
frame = next(self._reader_obj)
new_frame = TrajectoryFrame(self.atomic_indices, frame.frame_index, frame.positions,
return new_frame
def __str__(self) -> str:
s = ['Trajectory']
s += ['{:12} : {}'.format('filename', self.filename)]
s += ['{:12} : {}'.format('natoms', self.n_atoms)]
s += ['{:12} : {}'.format('frame_step', self.frame_step)]
s += ['{:12} : [{}\n {}\n {}]'
.format('cell', self.cell[0], self.cell[1], self.cell[2])]
return '\n'.join(s)
def _repr_html_(self) -> str:
s = [f'<h3>{self.__class__.__name__}</h3>']
s += ['<table border="1" class="dataframe">']
s += ['<thead><tr><th style="text-align: left;">Field</th><th>Value</th></tr></thead>']
s += ['<tbody>']
s += [f'<tr"><td style="text-align: left;">File name</td><td>{self.filename}</td></tr>']
s += [f'<tr><td style="text-align: left;">Number of atoms</td><td>{self.n_atoms}</td></tr>']
s += [f'<tr><td style="text-align: left;">Cell metric</td><td>{self.cell}</td></tr>']
s += [f'<tr><td style="text-align: left;">Frame step</td><td>{self.frame_step}</td></tr>']
s += [f'<tr><td style="text-align: left;">Atom types</td><td>{self.atom_types}</td></tr>']
s += ['</tbody>']
s += ['</table>']
return '\n'.join(s)
def cell(self):
""" Simulation cell """
return self._cell
def n_atoms(self):
""" Number of atoms """
return self._n_atoms
def filename(self):
""" The trajectory filename """
return self._filename
def atomic_indices(self):
""" Return copy of index arrays """
atomic_indices = dict()
for name, inds in self._atomic_indices.items():
atomic_indices[name] = inds.copy()
return atomic_indices
def atom_types(self) -> List[str]:
return sorted(self._atomic_indices.keys())
def frame_step(self):
""" Frame to access, trajectory will return every :attr:`frame_step`-th snapshot """
return self._frame_step
def consume(iterator, n):
""" Advance the iterator by :attr:`n` steps. If :attr:`n` is ``None``, consume entirely. """
# From the python.org
if n is None:
deque(iterator, maxlen=0)
next(islice(iterator, n, n), None)
class WindowIterator:
"""Sliding window iterator.
Returns consecutive windows (a window is represented as a list
of objects), created from an input iterator.
Trajectory object
Length of window (``window_size`` + 1)
Distance between the start of two consecutive window frames
Enables processing each non-discarded object; useful if ``window_step >
width`` and ``map_item`` is expensive (as compared to directly passing
``map(fun, itraj)`` as ``itraj``); if ``window_step < width``, you could as
well directly pass ``map(fun, itraj)``.
def __init__(self,
itraj: Trajectory,
width: int,
window_step: int = 1,
element_processor: Callable = None):
self._raw_it = itraj
if element_processor:
self._it = map(element_processor, self._raw_it)
self._it = self._raw_it
assert window_step >= 1
assert width >= 1
self.width = width
self.window_step = window_step
self._window = None
def __iter__(self):
return self
def __next__(self):
""" Returns next element in sequence. """
if self._window is None:
self._window = deque(islice(self._it, self.width), self.width)
if self.window_step >= self.width:
consume(self._raw_it, self.window_step - self.width)
for _ in range(min((self.window_step, len(self._window)))):
for f in islice(self._it, min((self.window_step, self.width))):
if len(self._window) == 0:
raise StopIteration
return list(self._window)