Coverage for local_installation/dynasor/trajectory/mdanalysis_trajectory_reader.py: 100%
71 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-09-27 15:43 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-09-27 15:43 +0000
1from itertools import count
2from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
3from dynasor.trajectory.trajectory_frame import ReaderFrame
4from dynasor.logging_tools import logger
5import MDAnalysis as mda
6import warnings
9warnings.filterwarnings('error', category=UserWarning)
12class MDAnalysisTrajectoryReader(AbstractTrajectoryReader):
13 """ Read a trajectory using the MDAnalysis Python library.
15 Parameters
16 ----------
17 filename
18 Name of input file.
19 trajectory_format
20 Type of trajectory. See MDAnalysis for the available formats.
21 length_unit
22 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
23 time_unit
24 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
25 """
27 def __init__(self,
28 filename: str,
29 trajectory_format: str,
30 length_unit: str = 'Angstrom',
31 time_unit: str = 'fs'):
33 self._open = True
34 self._first_called = False
36 with warnings.catch_warnings():
37 warnings.filterwarnings('ignore', category=UserWarning,
38 message='Guessed all Masses to 1.0')
39 u = mda.Universe(filename, format=trajectory_format, convert_units=False)
40 self._frame_index = count(0)
41 self._trajectory = u.trajectory
43 # Set atomic_indices dict, if possible
44 try:
45 self._atom_types = u.atoms.types
46 except mda.exceptions.NoDataError:
47 self._atom_types = None
49 trajectory_length_unit = self._trajectory.units['length']
50 trajectory_time_unit = self._trajectory.units['time']
52 if trajectory_length_unit is None or trajectory_time_unit is None: # No units from traj...
53 if length_unit not in self.lengthunits_to_nm_table \
54 or time_unit not in self.timeunits_to_fs_table: # ... and incorrect units from user
55 raise ValueError('Trajectory contains no unit information and specified units not '
56 'recognized. Please check the available units.')
58 else: # ... but correct units from user
59 def convert_units(ts):
60 length_scaling = mda.units.get_conversion_factor('length',
61 length_unit,
62 'Angstrom')
63 time_scaling = mda.units.get_conversion_factor('time',
64 time_unit,
65 'fs')
66 ts.positions *= length_scaling
67 ts.triclinic_dimensions *= length_scaling
68 if ts.has_velocities:
69 ts.velocities *= length_scaling/time_scaling
70 return ts
71 self._trajectory.add_transformations(convert_units)
73 else: # Units from trajectory
74 if (length_unit != trajectory_length_unit and trajectory_length_unit is not None) or \
75 (time_unit != trajectory_time_unit and trajectory_time_unit is not None):
76 logger.warning(f'The units {length_unit} and {time_unit} were specified by user '
77 f'but the units {trajectory_length_unit} and {trajectory_time_unit} '
78 'were read from trajectory. Disregarding user-specified units and '
79 f'using {trajectory_length_unit} and {trajectory_time_unit}.')
81 def convert_units(ts):
82 length_scaling = mda.units.get_conversion_factor('length', trajectory_length_unit,
83 'Angstrom')
84 time_scaling = mda.units.get_conversion_factor('time', trajectory_time_unit, 'fs')
85 ts.positions *= length_scaling
86 ts.triclinic_dimensions *= length_scaling
87 with warnings.catch_warnings():
88 warnings.filterwarnings('ignore', category=UserWarning,
89 message='Reader has no dt information, set to 1.0 ps')
90 if ts.has_velocities:
91 ts.velocities *= length_scaling/time_scaling
92 return ts
93 self._trajectory.add_transformations(convert_units)
95 def _get_next(self):
96 if self._first_called:
97 self._trajectory.next()
98 else:
99 self._first_called = True
100 self._positions = self._trajectory.ts.positions
101 self._cell = self._trajectory.ts.triclinic_dimensions
102 self._n_atoms = self._trajectory.ts.n_atoms
103 if self._trajectory.ts.has_velocities:
104 self._velocities = self._trajectory.ts.velocities
105 else:
106 self._velocities = None
108 def __iter__(self):
109 """ Iterates through the trajectory file, frame by frame. """
110 return self
112 def __next__(self):
113 """ Gets next trajectory frame. """
114 if not self._open:
115 raise StopIteration
117 self._get_next()
119 if self._velocities is not None:
120 frame = ReaderFrame(frame_index=next(self._frame_index),
121 cell=self._cell,
122 n_atoms=self._n_atoms,
123 positions=self._positions.copy(),
124 velocities=self._velocities.copy(),
125 atom_types=self._atom_types
126 )
127 else:
128 frame = ReaderFrame(frame_index=next(self._frame_index),
129 cell=self._cell,
130 n_atoms=self._n_atoms,
131 positions=self._positions.copy(),
132 atom_types=self._atom_types
133 )
135 return frame
137 def close(self):
138 """ Closes down, release resources etc. """
139 if self._open:
140 self._trajectory.close()
141 self._open = False