Coverage for dynasor / trajectory / mdanalysis_trajectory_reader.py: 98%
73 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
1from itertools import count
2from typing import Optional
3from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
4from dynasor.trajectory.trajectory_frame import ReaderFrame
5from dynasor.logging_tools import logger
6import MDAnalysis as mda
7import warnings
10class MDAnalysisTrajectoryReader(AbstractTrajectoryReader):
11 """ Read a trajectory using the MDAnalysis Python library.
13 Parameters
14 ----------
15 filename
16 Name of input file.
17 trajectory_format
18 Type of trajectory. See MDAnalysis for the available formats.
19 length_unit
20 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
21 time_unit
22 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
23 """
25 def __init__(self,
26 filename: str,
27 trajectory_format: str,
28 length_unit: Optional[str] = None,
29 time_unit: Optional[str] = None):
31 self._open = True
32 self._first_called = False
34 with warnings.catch_warnings():
35 warnings.filterwarnings('ignore', category=UserWarning,
36 message='Guessed all Masses to 1.0')
37 warnings.filterwarnings('ignore', category=UserWarning,
38 message='Reader has no dt information, set to 1.0 ps')
39 u = mda.Universe(filename, format=trajectory_format, convert_units=False)
40 self._frame_index = count(0)
41 self._trajectory = u.trajectory
43 # Set atomic_indices dict, if possible
44 try:
45 self._atom_types = u.atoms.types
46 except mda.exceptions.NoDataError:
47 self._atom_types = None
49 trajectory_length_unit = self._trajectory.units['length']
50 trajectory_time_unit = self._trajectory.units['time']
52 if length_unit is not None and time_unit is not None: # User has specified units...
53 if length_unit not in self.lengthunits_to_Angstrom_table \
54 or time_unit not in self.timeunits_to_fs_table: # ... but these are unknown
55 raise ValueError('Unknown length and/or time unit. Please check the available '
56 'options.')
57 else: # ... and these are correct
58 def convert_units(ts):
59 length_scaling = mda.units.get_conversion_factor('length',
60 length_unit,
61 'Angstrom')
62 time_scaling = mda.units.get_conversion_factor('time',
63 time_unit,
64 'fs')
65 ts.positions *= length_scaling
66 ts.triclinic_dimensions *= length_scaling
67 if ts.has_velocities:
68 ts.velocities *= length_scaling/time_scaling
69 return ts
70 self._trajectory.add_transformations(convert_units)
71 elif trajectory_length_unit is not None and trajectory_time_unit is not None: # Read units 71 ↛ 86line 71 didn't jump to line 86 because the condition on line 71 was always true
72 logger.info(f'Length unit {trajectory_length_unit} and time unit {trajectory_time_unit}'
73 ' inferred from trajectory.')
75 def convert_units(ts):
76 length_scaling = mda.units.get_conversion_factor('length', trajectory_length_unit,
77 'Angstrom')
78 time_scaling = mda.units.get_conversion_factor('time', trajectory_time_unit, 'fs')
79 ts.positions *= length_scaling
80 ts.triclinic_dimensions *= length_scaling
81 if ts.has_velocities:
82 ts.velocities *= length_scaling/time_scaling
83 return ts
84 self._trajectory.add_transformations(convert_units)
85 else: # No units from user and reader cannot read units
86 logger.info('Assuming the trajectory has the default units (Ångström and fs), since no '
87 'units were specified and units could not be inferred from the trajectory.')
89 def _get_next(self):
90 with warnings.catch_warnings():
91 warnings.filterwarnings('ignore', category=UserWarning,
92 message='Reader has no dt information, set to 1.0 ps')
93 if self._first_called:
94 self._trajectory.next()
95 else:
96 self._first_called = True
97 self._positions = self._trajectory.ts.positions
98 self._cell = self._trajectory.ts.triclinic_dimensions
99 self._n_atoms = self._trajectory.ts.n_atoms
100 if self._trajectory.ts.has_velocities:
101 self._velocities = self._trajectory.ts.velocities
102 else:
103 self._velocities = None
105 def __iter__(self):
106 """ Iterates through the trajectory file, frame by frame. """
107 return self
109 def __next__(self):
110 """ Gets next trajectory frame. """
111 if not self._open:
112 raise StopIteration
114 self._get_next()
116 if self._velocities is not None:
117 frame = ReaderFrame(frame_index=next(self._frame_index),
118 cell=self._cell,
119 n_atoms=self._n_atoms,
120 positions=self._positions.copy(),
121 velocities=self._velocities.copy(),
122 atom_types=self._atom_types
123 )
124 else:
125 frame = ReaderFrame(frame_index=next(self._frame_index),
126 cell=self._cell,
127 n_atoms=self._n_atoms,
128 positions=self._positions.copy(),
129 atom_types=self._atom_types
130 )
132 return frame
134 def close(self):
135 """ Closes down, release resources etc. """
136 if self._open:
137 self._trajectory.close()
138 self._open = False