Coverage for local_installation/dynasor/trajectory/mdanalysis_trajectory_reader.py: 100%
71 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-12-21 12:02 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-12-21 12:02 +0000
1from itertools import count
2from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
3from dynasor.trajectory.trajectory_frame import ReaderFrame
4from dynasor.logging_tools import logger
5import MDAnalysis as mda
6import warnings
9class MDAnalysisTrajectoryReader(AbstractTrajectoryReader):
10 """ Read a trajectory using the MDAnalysis Python library.
12 Parameters
13 ----------
14 filename
15 Name of input file.
16 trajectory_format
17 Type of trajectory. See MDAnalysis for the available formats.
18 length_unit
19 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
20 time_unit
21 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
22 """
24 def __init__(self,
25 filename: str,
26 trajectory_format: str,
27 length_unit: str = 'Angstrom',
28 time_unit: str = 'fs'):
30 self._open = True
31 self._first_called = False
33 with warnings.catch_warnings():
34 warnings.filterwarnings('ignore', category=UserWarning,
35 message='Guessed all Masses to 1.0')
36 warnings.filterwarnings('ignore', category=UserWarning,
37 message='Reader has no dt information, set to 1.0 ps')
38 u = mda.Universe(filename, format=trajectory_format, convert_units=False)
39 self._frame_index = count(0)
40 self._trajectory = u.trajectory
42 # Set atomic_indices dict, if possible
43 try:
44 self._atom_types = u.atoms.types
45 except mda.exceptions.NoDataError:
46 self._atom_types = None
48 trajectory_length_unit = self._trajectory.units['length']
49 trajectory_time_unit = self._trajectory.units['time']
51 if trajectory_length_unit is None or trajectory_time_unit is None: # No units from traj...
52 if length_unit not in self.lengthunits_to_nm_table \
53 or time_unit not in self.timeunits_to_fs_table: # ... and incorrect units from user
54 raise ValueError('Trajectory contains no unit information and specified units not '
55 'recognized. Please check the available units.')
57 else: # ... but correct units from user
58 def convert_units(ts):
59 length_scaling = mda.units.get_conversion_factor('length',
60 length_unit,
61 'Angstrom')
62 time_scaling = mda.units.get_conversion_factor('time',
63 time_unit,
64 'fs')
65 ts.positions *= length_scaling
66 ts.triclinic_dimensions *= length_scaling
67 if ts.has_velocities:
68 ts.velocities *= length_scaling/time_scaling
69 return ts
70 self._trajectory.add_transformations(convert_units)
71 else: # Units from trajectory
72 if (length_unit != trajectory_length_unit and trajectory_length_unit is not None) or \
73 (time_unit != trajectory_time_unit and trajectory_time_unit is not None):
74 logger.warning(f'The units {length_unit} and {time_unit} were specified by user '
75 f'but the units {trajectory_length_unit} and {trajectory_time_unit} '
76 'were read from trajectory. Disregarding user-specified units and '
77 f'using {trajectory_length_unit} and {trajectory_time_unit}.')
79 def convert_units(ts):
80 length_scaling = mda.units.get_conversion_factor('length', trajectory_length_unit,
81 'Angstrom')
82 time_scaling = mda.units.get_conversion_factor('time', trajectory_time_unit, 'fs')
83 ts.positions *= length_scaling
84 ts.triclinic_dimensions *= length_scaling
85 if ts.has_velocities:
86 ts.velocities *= length_scaling/time_scaling
87 return ts
88 self._trajectory.add_transformations(convert_units)
90 def _get_next(self):
91 with warnings.catch_warnings():
92 warnings.filterwarnings('ignore', category=UserWarning,
93 message='Reader has no dt information, set to 1.0 ps')
94 if self._first_called:
95 self._trajectory.next()
96 else:
97 self._first_called = True
98 self._positions = self._trajectory.ts.positions
99 self._cell = self._trajectory.ts.triclinic_dimensions
100 self._n_atoms = self._trajectory.ts.n_atoms
101 if self._trajectory.ts.has_velocities:
102 self._velocities = self._trajectory.ts.velocities
103 else:
104 self._velocities = None
106 def __iter__(self):
107 """ Iterates through the trajectory file, frame by frame. """
108 return self
110 def __next__(self):
111 """ Gets next trajectory frame. """
112 if not self._open:
113 raise StopIteration
115 self._get_next()
117 if self._velocities is not None:
118 frame = ReaderFrame(frame_index=next(self._frame_index),
119 cell=self._cell,
120 n_atoms=self._n_atoms,
121 positions=self._positions.copy(),
122 velocities=self._velocities.copy(),
123 atom_types=self._atom_types
124 )
125 else:
126 frame = ReaderFrame(frame_index=next(self._frame_index),
127 cell=self._cell,
128 n_atoms=self._n_atoms,
129 positions=self._positions.copy(),
130 atom_types=self._atom_types
131 )
133 return frame
135 def close(self):
136 """ Closes down, release resources etc. """
137 if self._open:
138 self._trajectory.close()
139 self._open = False