Coverage for local_installation/dynasor/trajectory/mdanalysis_trajectory_reader.py: 100%
71 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-30 21:04 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-30 21:04 +0000
1from itertools import count
2from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
3from dynasor.trajectory.trajectory_frame import ReaderFrame
4from dynasor.logging_tools import logger
5import MDAnalysis as mda
6import warnings
9warnings.filterwarnings('error', category=UserWarning)
12class MDAnalysisTrajectoryReader(AbstractTrajectoryReader):
13 """ Read a trajectory using the MDAnalysis Python library.
15 Parameters
16 ----------
17 filename
18 Name of input file.
19 trajectory_format
20 Type of trajectory. See MDAnalysis for the available formats.
21 length_unit
22 Length unit of trajectory. Necessary for correct conversion to internal dynasor units if
23 the trajectory file does not contain unit information.
24 For available options see MDAnalysis.
25 time_unit
26 Time unit of trajectory. Necessary for correct conversion to internal dynasor units if
27 the trajectory file does not contain unit information.
28 For available options see MDAnalysis.
29 """
31 def __init__(self,
32 filename: str,
33 trajectory_format: str,
34 length_unit: str = None,
35 time_unit: str = None):
37 self._open = True
38 self._first_called = False
40 with warnings.catch_warnings():
41 warnings.filterwarnings('ignore', category=UserWarning,
42 message='Guessed all Masses to 1.0')
43 u = mda.Universe(filename, format=trajectory_format, convert_units=False)
44 self._frame_index = count(0)
45 self._trajectory = u.trajectory
47 # Set atomic_indices dict, if possible
48 try:
49 self._atom_types = u.atoms.types
50 except mda.exceptions.NoDataError:
51 self._atom_types = None
53 trajectory_length_unit = self._trajectory.units['length']
54 trajectory_time_unit = self._trajectory.units['time']
56 if length_unit is None or time_unit is None: # No units from user
57 if trajectory_length_unit is None \
58 or trajectory_time_unit is None: # No units from trajectory either
59 raise IOError('Failed to read units from trajectory. Please specify units.')
61 else: # No units from user but units from trajectory
62 def convert_units(ts):
63 length_scaling = mda.units.get_conversion_factor('length',
64 trajectory_length_unit,
65 'Angstrom')
66 time_scaling = mda.units.get_conversion_factor('time',
67 trajectory_time_unit,
68 'fs')
69 ts.positions *= length_scaling
70 ts.triclinic_dimensions *= length_scaling
71 if ts.has_velocities:
72 ts.velocities *= length_scaling/time_scaling
73 return ts
74 self._trajectory.add_transformations(convert_units)
76 else: # Units from user, even if trajectory has units
77 if (length_unit != trajectory_length_unit and trajectory_length_unit is not None) or \
78 (time_unit != trajectory_time_unit and trajectory_time_unit is not None):
79 logger.warning(f'The units {length_unit} and {time_unit} were specified by user '
80 f'but the units {trajectory_length_unit} and {trajectory_time_unit} '
81 'were read from trajectory. Disregarding units read from trajectory '
82 f'and using the user-specified {length_unit} and {time_unit}.')
84 def convert_units(ts):
85 length_scaling = mda.units.get_conversion_factor('length', length_unit, 'Angstrom')
86 time_scaling = mda.units.get_conversion_factor('time', time_unit, 'fs')
87 ts.positions *= length_scaling
88 ts.triclinic_dimensions *= length_scaling
89 with warnings.catch_warnings():
90 warnings.filterwarnings('ignore', category=UserWarning,
91 message='Reader has no dt information, set to 1.0 ps')
92 if ts.has_velocities:
93 ts.velocities *= length_scaling/time_scaling
94 return ts
95 self._trajectory.add_transformations(convert_units)
97 def _get_next(self):
98 if self._first_called:
99 self._trajectory.next()
100 else:
101 self._first_called = True
102 self._positions = self._trajectory.ts.positions
103 self._cell = self._trajectory.ts.triclinic_dimensions
104 self._n_atoms = self._trajectory.ts.n_atoms
105 if self._trajectory.ts.has_velocities:
106 self._velocities = self._trajectory.ts.velocities
107 else:
108 self._velocities = None
110 def __iter__(self):
111 """ Iterates through the trajectory file, frame by frame. """
112 return self
114 def __next__(self):
115 """ Gets next trajectory frame. """
116 if not self._open:
117 raise StopIteration
119 self._get_next()
121 if self._velocities is not None:
122 frame = ReaderFrame(frame_index=next(self._frame_index),
123 cell=self._cell,
124 n_atoms=self._n_atoms,
125 positions=self._positions.copy(),
126 velocities=self._velocities.copy(),
127 atom_types=self._atom_types
128 )
129 else:
130 frame = ReaderFrame(frame_index=next(self._frame_index),
131 cell=self._cell,
132 n_atoms=self._n_atoms,
133 positions=self._positions.copy(),
134 atom_types=self._atom_types
135 )
137 return frame
139 def close(self):
140 """ Closes down, release resources etc. """
141 if self._open:
142 self._trajectory.close()
143 self._open = False