Coverage for local_installation/dynasor/trajectory/mdanalysis_trajectory_reader.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-12-21 12:02 +0000

1from itertools import count 

2from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

3from dynasor.trajectory.trajectory_frame import ReaderFrame 

4from dynasor.logging_tools import logger 

5import MDAnalysis as mda 

6import warnings 

7 

8 

9class MDAnalysisTrajectoryReader(AbstractTrajectoryReader): 

10 """ Read a trajectory using the MDAnalysis Python library. 

11 

12 Parameters 

13 ---------- 

14 filename 

15 Name of input file. 

16 trajectory_format 

17 Type of trajectory. See MDAnalysis for the available formats. 

18 length_unit 

19 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

20 time_unit 

21 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

22 """ 

23 

24 def __init__(self, 

25 filename: str, 

26 trajectory_format: str, 

27 length_unit: str = 'Angstrom', 

28 time_unit: str = 'fs'): 

29 

30 self._open = True 

31 self._first_called = False 

32 

33 with warnings.catch_warnings(): 

34 warnings.filterwarnings('ignore', category=UserWarning, 

35 message='Guessed all Masses to 1.0') 

36 warnings.filterwarnings('ignore', category=UserWarning, 

37 message='Reader has no dt information, set to 1.0 ps') 

38 u = mda.Universe(filename, format=trajectory_format, convert_units=False) 

39 self._frame_index = count(0) 

40 self._trajectory = u.trajectory 

41 

42 # Set atomic_indices dict, if possible 

43 try: 

44 self._atom_types = u.atoms.types 

45 except mda.exceptions.NoDataError: 

46 self._atom_types = None 

47 

48 trajectory_length_unit = self._trajectory.units['length'] 

49 trajectory_time_unit = self._trajectory.units['time'] 

50 

51 if trajectory_length_unit is None or trajectory_time_unit is None: # No units from traj... 

52 if length_unit not in self.lengthunits_to_nm_table \ 

53 or time_unit not in self.timeunits_to_fs_table: # ... and incorrect units from user 

54 raise ValueError('Trajectory contains no unit information and specified units not ' 

55 'recognized. Please check the available units.') 

56 

57 else: # ... but correct units from user 

58 def convert_units(ts): 

59 length_scaling = mda.units.get_conversion_factor('length', 

60 length_unit, 

61 'Angstrom') 

62 time_scaling = mda.units.get_conversion_factor('time', 

63 time_unit, 

64 'fs') 

65 ts.positions *= length_scaling 

66 ts.triclinic_dimensions *= length_scaling 

67 if ts.has_velocities: 

68 ts.velocities *= length_scaling/time_scaling 

69 return ts 

70 self._trajectory.add_transformations(convert_units) 

71 else: # Units from trajectory 

72 if (length_unit != trajectory_length_unit and trajectory_length_unit is not None) or \ 

73 (time_unit != trajectory_time_unit and trajectory_time_unit is not None): 

74 logger.warning(f'The units {length_unit} and {time_unit} were specified by user ' 

75 f'but the units {trajectory_length_unit} and {trajectory_time_unit} ' 

76 'were read from trajectory. Disregarding user-specified units and ' 

77 f'using {trajectory_length_unit} and {trajectory_time_unit}.') 

78 

79 def convert_units(ts): 

80 length_scaling = mda.units.get_conversion_factor('length', trajectory_length_unit, 

81 'Angstrom') 

82 time_scaling = mda.units.get_conversion_factor('time', trajectory_time_unit, 'fs') 

83 ts.positions *= length_scaling 

84 ts.triclinic_dimensions *= length_scaling 

85 if ts.has_velocities: 

86 ts.velocities *= length_scaling/time_scaling 

87 return ts 

88 self._trajectory.add_transformations(convert_units) 

89 

90 def _get_next(self): 

91 with warnings.catch_warnings(): 

92 warnings.filterwarnings('ignore', category=UserWarning, 

93 message='Reader has no dt information, set to 1.0 ps') 

94 if self._first_called: 

95 self._trajectory.next() 

96 else: 

97 self._first_called = True 

98 self._positions = self._trajectory.ts.positions 

99 self._cell = self._trajectory.ts.triclinic_dimensions 

100 self._n_atoms = self._trajectory.ts.n_atoms 

101 if self._trajectory.ts.has_velocities: 

102 self._velocities = self._trajectory.ts.velocities 

103 else: 

104 self._velocities = None 

105 

106 def __iter__(self): 

107 """ Iterates through the trajectory file, frame by frame. """ 

108 return self 

109 

110 def __next__(self): 

111 """ Gets next trajectory frame. """ 

112 if not self._open: 

113 raise StopIteration 

114 

115 self._get_next() 

116 

117 if self._velocities is not None: 

118 frame = ReaderFrame(frame_index=next(self._frame_index), 

119 cell=self._cell, 

120 n_atoms=self._n_atoms, 

121 positions=self._positions.copy(), 

122 velocities=self._velocities.copy(), 

123 atom_types=self._atom_types 

124 ) 

125 else: 

126 frame = ReaderFrame(frame_index=next(self._frame_index), 

127 cell=self._cell, 

128 n_atoms=self._n_atoms, 

129 positions=self._positions.copy(), 

130 atom_types=self._atom_types 

131 ) 

132 

133 return frame 

134 

135 def close(self): 

136 """ Closes down, release resources etc. """ 

137 if self._open: 

138 self._trajectory.close() 

139 self._open = False