Coverage for local_installation/dynasor/trajectory/trajectory.py: 94%
158 statements
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-16 06:13 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2025-04-16 06:13 +0000
1__all__ = ['Trajectory', 'WindowIterator']
3import numpy as np
5from collections import deque
6from itertools import islice, chain
7from os.path import isfile
8from typing import Callable, Dict, Union, List
10from dynasor.trajectory.atomic_indices import parse_gromacs_index_file
11from dynasor.trajectory.ase_trajectory_reader import ASETrajectoryReader
12from dynasor.trajectory.extxyz_trajectory_reader import ExtxyzTrajectoryReader
13from dynasor.trajectory.lammps_trajectory_reader import LammpsTrajectoryReader
14from dynasor.trajectory.mdanalysis_trajectory_reader import MDAnalysisTrajectoryReader
15from dynasor.trajectory.trajectory_frame import TrajectoryFrame
16from dynasor.logging_tools import logger
19class Trajectory:
20 """Instances of this class hold trajectories in a format suitable for
21 the computation of correlation functions. They behave as
22 iterators, where each step returns the next frame as a
23 :class:`TrajectoryFrame` object. The latter hold information
24 regarding atomic positions, types, and velocities.
26 Parameters
27 ----------
28 filename
29 Name of input file
30 trajectory_format
31 Type of trajectory. Possible values are:
32 ``'lammps_internal'``, ``'extxyz'``, ``'ase'`` or one of the formats supported by
33 `MDAnalysis <https://www.mdanalysis.org/>`_ (except for ``'lammpsdump'``,
34 which can be called by specifying ``'lammps_mdanalysis'`` to avoid ambiguity)
35 atomic_indices
36 Specify which indices belong to which atom type. Can be
37 (1) a dictionary where the keys specicy species and the values are list of atomic indices,
38 (2) ``'read_from_trajectory'``, in which case the species are read from the trajectory or
39 (3) the path to an index file.
40 length_unit
41 Length unit of trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). Necessary for
42 correct conversion to internal dynasor units if the trajectory file does not contain unit
43 information.
44 time_unit
45 Time unit of trajectory (``'fs'``, ``'ps'``, ``'ns'``). Necessary for correct conversion to
46 internal dynasor units if the trajectory file does not contain unit information.
47 frame_start
48 First frame to read; must be larger or equal ``0``.
49 frame_stop
50 Last frame to read. By default (``None``) the entire trajectory is read.
51 frame_step
52 Read every :attr:`frame_step`-th step of the input trajectory.
53 By default (``1``) every frame is read. Must be larger than ``0``.
55 """
56 def __init__(
57 self,
58 filename: str,
59 trajectory_format: str,
60 atomic_indices: Union[str, Dict[str, List[int]]] = None,
61 length_unit: str = 'Angstrom',
62 time_unit: str = 'fs',
63 frame_start: int = 0,
64 frame_stop: int = None,
65 frame_step: int = 1
66 ):
68 if frame_start < 0:
69 raise ValueError('frame_start should be positive')
70 if frame_step < 0:
71 raise ValueError('frame_step should be positive')
73 self._frame_start = frame_start
74 self._frame_step = frame_step
75 self._frame_stop = frame_stop
77 # setup trajectory reader
78 if not isfile(filename):
79 raise IOError(f'File {filename} does not exist')
80 self._filename = filename
82 if trajectory_format == 'lammps_internal':
83 reader = LammpsTrajectoryReader
84 elif trajectory_format == 'extxyz':
85 reader = ExtxyzTrajectoryReader
86 elif trajectory_format == 'lammps_mdanalysis':
87 reader = MDAnalysisTrajectoryReader
88 trajectory_format = 'lammpsdump'
89 elif trajectory_format == 'ase': 89 ↛ 90line 89 didn't jump to line 90, because the condition on line 89 was never true
90 reader = ASETrajectoryReader
91 elif trajectory_format == 'lammps':
92 raise IOError('Ambiguous trajectory format, '
93 'did you mean lammps_internal or lammps_mdanalysis?')
94 else:
95 reader = MDAnalysisTrajectoryReader
97 logger.debug(f'Using trajectory reader: {reader.__name__}')
98 if reader == MDAnalysisTrajectoryReader:
99 self._reader_obj = reader(self._filename, trajectory_format,
100 length_unit=length_unit, time_unit=time_unit)
101 else:
102 self._reader_obj = reader(self._filename, length_unit=length_unit, time_unit=time_unit)
104 # Get two frames to set cell etc.
105 frame0 = next(self._reader_obj)
106 frame1 = next(self._reader_obj)
107 self._cell = frame0.cell
108 self._n_atoms = frame0.n_atoms
110 # Make sure cell is not changed during consecutive frames
111 if not np.allclose(frame0.cell, frame1.cell):
112 raise ValueError('The cell changes between the first and second frame. '
113 'The concept of q-points becomes muddy if the simulation cell is '
114 'changing, such as during NPT MD simulations, so trajectories where '
115 'the cell changes are not supported by dynasor.')
117 # setup iterator slice (reuse frame0 and frame1 via chain)
118 self.number_of_frames_read = 0
119 self.current_frame_index = 0
120 self._reader_iter = islice(chain([frame0, frame1], self._reader_obj),
121 self._frame_start, self._frame_stop, self._frame_step)
123 # setup atomic indices
124 if atomic_indices is None: # Default behaviour
125 atomic_indices = {'X': np.arange(0, self.n_atoms)}
126 elif isinstance(atomic_indices, str): # Str input
127 if atomic_indices == 'read_from_trajectory':
128 if frame0.atom_types is None:
129 raise ValueError('Could not read atomic indices from the trajectory.')
130 else:
131 uniques = np.unique(frame0.atom_types)
132 atomic_indices = {str(uniques[i]):
133 (frame0.atom_types == uniques[i]).nonzero()[0]
134 for i in range(len(uniques))}
135 else:
136 atomic_indices = parse_gromacs_index_file(atomic_indices)
137 elif isinstance(atomic_indices, dict): # Dict input
138 pass
139 else:
140 raise ValueError('Could not understand atomic_indices.')
141 self._atomic_indices = atomic_indices
143 # sanity checks for atomic_indices
144 for key, indices in self._atomic_indices.items():
145 if np.max(indices) > self.n_atoms:
146 raise ValueError('maximum index in atomic_indices exceeds number of atoms')
147 if np.min(indices) < 0:
148 raise ValueError('minimum index in atomic_indices is negative')
149 if '_' in key:
150 # Since '_' is what we use to distinguish atom types in the results, e.g. Sqw_Cs_Pb
151 raise ValueError('The char "_" is not allowed in atomic_indices.')
153 # log info on trajectory and atom types etc
154 logger.info(f'Trajectory file: {self.filename}')
155 logger.info(f'Total number of particles: {self.n_atoms}')
156 logger.info(f'Number of atom types: {len(self.atom_types)}')
157 for atom_type, indices in self._atomic_indices.items():
158 logger.info(f'Number of atoms of type {atom_type}: {len(indices)}')
159 logger.info(f'Simulation cell (in Angstrom):\n{str(self._cell)}')
161 def __iter__(self):
162 return self
164 def __next__(self):
165 frame = next(self._reader_iter)
166 new_frame = TrajectoryFrame(self.atomic_indices, frame.frame_index, frame.positions,
167 frame.velocities)
168 self.number_of_frames_read += 1
169 self.current_frame_index = frame.frame_index
170 return new_frame
172 def __str__(self) -> str:
173 s = ['Trajectory']
174 s += ['{:12} : {}'.format('filename', self.filename)]
175 s += ['{:12} : {}'.format('natoms', self.n_atoms)]
176 s += ['{:12} : {}'.format('frame_start', self._frame_start)]
177 s += ['{:12} : {}'.format('frame_stop', self._frame_stop)]
178 s += ['{:12} : {}'.format('frame_step', self.frame_step)]
179 s += ['{:12} : {}'.format('frame_index', self.current_frame_index)]
180 s += ['{:12} : [{}\n {}\n {}]'
181 .format('cell', self.cell[0], self.cell[1], self.cell[2])]
182 return '\n'.join(s)
184 def __repr__(self) -> str:
185 return str(self)
187 def _repr_html_(self) -> str:
188 s = [f'<h3>{self.__class__.__name__}</h3>']
189 s += ['<table border="1" class="dataframe">']
190 s += ['<thead><tr><th style="text-align: left;">Field</th><th>Value</th></tr></thead>']
191 s += ['<tbody>']
192 s += [f'<tr"><td style="text-align: left;">File name</td><td>{self.filename}</td></tr>']
193 s += [f'<tr><td style="text-align: left;">Number of atoms</td><td>{self.n_atoms}</td></tr>']
194 s += [f'<tr><td style="text-align: left;">Cell metric</td><td>{self.cell}</td></tr>']
195 s += [f'<tr><td style="text-align: left;">Frame step</td><td>{self.frame_step}</td></tr>']
196 s += [f'<tr><td style="text-align: left;">Atom types</td><td>{self.atom_types}</td></tr>']
197 s += ['</tbody>']
198 s += ['</table>']
199 return '\n'.join(s)
201 @property
202 def cell(self):
203 """ Simulation cell """
204 return self._cell
206 @property
207 def n_atoms(self):
208 """ Number of atoms """
209 return self._n_atoms
211 @property
212 def filename(self):
213 """ The trajectory filename """
214 return self._filename
216 @property
217 def atomic_indices(self):
218 """ Return copy of index arrays """
219 atomic_indices = dict()
220 for name, inds in self._atomic_indices.items():
221 atomic_indices[name] = inds.copy()
222 return atomic_indices
224 @property
225 def atom_types(self) -> List[str]:
226 return sorted(self._atomic_indices.keys())
228 @property
229 def frame_step(self):
230 """ Frame to access, trajectory will return every :attr:`frame_step`-th snapshot """
231 return self._frame_step
234def consume(iterator, n):
235 """ Advance the iterator by :attr:`n` steps. If :attr:`n` is ``None``, consume entirely. """
236 # From the python.org
237 if n is None: 237 ↛ 238line 237 didn't jump to line 238, because the condition on line 237 was never true
238 deque(iterator, maxlen=0)
239 else:
240 next(islice(iterator, n, n), None)
243class WindowIterator:
244 """Sliding window iterator.
246 Returns consecutive windows (a window is represented as a list
247 of objects), created from an input iterator.
249 Parameters
250 ----------
251 itraj
252 Trajectory object
253 width
254 Length of window (``window_size`` + 1)
255 window_step
256 Distance between the start of two consecutive window frames
257 element_processor
258 Enables processing each non-discarded object; useful if ``window_step >
259 width`` and ``map_item`` is expensive (as compared to directly passing
260 ``map(fun, itraj)`` as ``itraj``); if ``window_step < width``, you could as
261 well directly pass ``map(fun, itraj)``.
262 """
263 def __init__(self,
264 itraj: Trajectory,
265 width: int,
266 window_step: int = 1,
267 element_processor: Callable = None):
269 self._raw_it = itraj
270 if element_processor:
271 self._it = map(element_processor, self._raw_it)
272 else:
273 self._it = self._raw_it
274 assert window_step >= 1
275 assert width >= 1
276 self.width = width
277 self.window_step = window_step
278 self._window = None
280 def __iter__(self):
281 return self
283 def __next__(self):
284 """ Returns next element in sequence. """
285 if self._window is None:
286 self._window = deque(islice(self._it, self.width), self.width)
287 else:
288 if self.window_step >= self.width:
289 self._window.clear()
290 consume(self._raw_it, self.window_step - self.width)
291 else:
292 for _ in range(min((self.window_step, len(self._window)))):
293 self._window.popleft()
294 for f in islice(self._it, min((self.window_step, self.width))):
295 self._window.append(f)
297 if len(self._window) == 0:
298 raise StopIteration
300 return list(self._window)