Coverage for dynasor / trajectory / mdanalysis_trajectory_reader.py: 98%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 12:31 +0000

1from itertools import count 

2from typing import Optional 

3from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

4from dynasor.trajectory.trajectory_frame import ReaderFrame 

5from dynasor.logging_tools import logger 

6import MDAnalysis as mda 

7import warnings 

8 

9 

10class MDAnalysisTrajectoryReader(AbstractTrajectoryReader): 

11 """ Read a trajectory using the MDAnalysis Python library. 

12 

13 Parameters 

14 ---------- 

15 filename 

16 Name of input file. 

17 trajectory_format 

18 Type of trajectory. See MDAnalysis for the available formats. 

19 length_unit 

20 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

21 time_unit 

22 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

23 """ 

24 

25 def __init__(self, 

26 filename: str, 

27 trajectory_format: str, 

28 length_unit: Optional[str] = None, 

29 time_unit: Optional[str] = None): 

30 

31 self._open = True 

32 self._first_called = False 

33 

34 with warnings.catch_warnings(): 

35 warnings.filterwarnings('ignore', category=UserWarning, 

36 message='Guessed all Masses to 1.0') 

37 warnings.filterwarnings('ignore', category=UserWarning, 

38 message='Reader has no dt information, set to 1.0 ps') 

39 u = mda.Universe(filename, format=trajectory_format, convert_units=False) 

40 self._frame_index = count(0) 

41 self._trajectory = u.trajectory 

42 

43 # Set atomic_indices dict, if possible 

44 try: 

45 self._atom_types = u.atoms.types 

46 except mda.exceptions.NoDataError: 

47 self._atom_types = None 

48 

49 trajectory_length_unit = self._trajectory.units['length'] 

50 trajectory_time_unit = self._trajectory.units['time'] 

51 

52 if length_unit is not None and time_unit is not None: # User has specified units... 

53 if length_unit not in self.lengthunits_to_Angstrom_table \ 

54 or time_unit not in self.timeunits_to_fs_table: # ... but these are unknown 

55 raise ValueError('Unknown length and/or time unit. Please check the available ' 

56 'options.') 

57 else: # ... and these are correct 

58 def convert_units(ts): 

59 length_scaling = mda.units.get_conversion_factor('length', 

60 length_unit, 

61 'Angstrom') 

62 time_scaling = mda.units.get_conversion_factor('time', 

63 time_unit, 

64 'fs') 

65 ts.positions *= length_scaling 

66 ts.triclinic_dimensions *= length_scaling 

67 if ts.has_velocities: 

68 ts.velocities *= length_scaling/time_scaling 

69 return ts 

70 self._trajectory.add_transformations(convert_units) 

71 elif trajectory_length_unit is not None and trajectory_time_unit is not None: # Read units 71 ↛ 86line 71 didn't jump to line 86 because the condition on line 71 was always true

72 logger.info(f'Length unit {trajectory_length_unit} and time unit {trajectory_time_unit}' 

73 ' inferred from trajectory.') 

74 

75 def convert_units(ts): 

76 length_scaling = mda.units.get_conversion_factor('length', trajectory_length_unit, 

77 'Angstrom') 

78 time_scaling = mda.units.get_conversion_factor('time', trajectory_time_unit, 'fs') 

79 ts.positions *= length_scaling 

80 ts.triclinic_dimensions *= length_scaling 

81 if ts.has_velocities: 

82 ts.velocities *= length_scaling/time_scaling 

83 return ts 

84 self._trajectory.add_transformations(convert_units) 

85 else: # No units from user and reader cannot read units 

86 logger.info('Assuming the trajectory has the default units (Ångström and fs), since no ' 

87 'units were specified and units could not be inferred from the trajectory.') 

88 

89 def _get_next(self): 

90 with warnings.catch_warnings(): 

91 warnings.filterwarnings('ignore', category=UserWarning, 

92 message='Reader has no dt information, set to 1.0 ps') 

93 if self._first_called: 

94 self._trajectory.next() 

95 else: 

96 self._first_called = True 

97 self._positions = self._trajectory.ts.positions 

98 self._cell = self._trajectory.ts.triclinic_dimensions 

99 self._n_atoms = self._trajectory.ts.n_atoms 

100 if self._trajectory.ts.has_velocities: 

101 self._velocities = self._trajectory.ts.velocities 

102 else: 

103 self._velocities = None 

104 

105 def __iter__(self): 

106 """ Iterates through the trajectory file, frame by frame. """ 

107 return self 

108 

109 def __next__(self): 

110 """ Gets next trajectory frame. """ 

111 if not self._open: 

112 raise StopIteration 

113 

114 self._get_next() 

115 

116 if self._velocities is not None: 

117 frame = ReaderFrame(frame_index=next(self._frame_index), 

118 cell=self._cell, 

119 n_atoms=self._n_atoms, 

120 positions=self._positions.copy(), 

121 velocities=self._velocities.copy(), 

122 atom_types=self._atom_types 

123 ) 

124 else: 

125 frame = ReaderFrame(frame_index=next(self._frame_index), 

126 cell=self._cell, 

127 n_atoms=self._n_atoms, 

128 positions=self._positions.copy(), 

129 atom_types=self._atom_types 

130 ) 

131 

132 return frame 

133 

134 def close(self): 

135 """ Closes down, release resources etc. """ 

136 if self._open: 

137 self._trajectory.close() 

138 self._open = False