Coverage for local_installation/dynasor/trajectory/mdanalysis_trajectory_reader.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-30 21:04 +0000

1from itertools import count 

2from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

3from dynasor.trajectory.trajectory_frame import ReaderFrame 

4from dynasor.logging_tools import logger 

5import MDAnalysis as mda 

6import warnings 

7 

8 

9warnings.filterwarnings('error', category=UserWarning) 

10 

11 

12class MDAnalysisTrajectoryReader(AbstractTrajectoryReader): 

13 """ Read a trajectory using the MDAnalysis Python library. 

14 

15 Parameters 

16 ---------- 

17 filename 

18 Name of input file. 

19 trajectory_format 

20 Type of trajectory. See MDAnalysis for the available formats. 

21 length_unit 

22 Length unit of trajectory. Necessary for correct conversion to internal dynasor units if 

23 the trajectory file does not contain unit information. 

24 For available options see MDAnalysis. 

25 time_unit 

26 Time unit of trajectory. Necessary for correct conversion to internal dynasor units if 

27 the trajectory file does not contain unit information. 

28 For available options see MDAnalysis. 

29 """ 

30 

31 def __init__(self, 

32 filename: str, 

33 trajectory_format: str, 

34 length_unit: str = None, 

35 time_unit: str = None): 

36 

37 self._open = True 

38 self._first_called = False 

39 

40 with warnings.catch_warnings(): 

41 warnings.filterwarnings('ignore', category=UserWarning, 

42 message='Guessed all Masses to 1.0') 

43 u = mda.Universe(filename, format=trajectory_format, convert_units=False) 

44 self._frame_index = count(0) 

45 self._trajectory = u.trajectory 

46 

47 # Set atomic_indices dict, if possible 

48 try: 

49 self._atom_types = u.atoms.types 

50 except mda.exceptions.NoDataError: 

51 self._atom_types = None 

52 

53 trajectory_length_unit = self._trajectory.units['length'] 

54 trajectory_time_unit = self._trajectory.units['time'] 

55 

56 if length_unit is None or time_unit is None: # No units from user 

57 if trajectory_length_unit is None \ 

58 or trajectory_time_unit is None: # No units from trajectory either 

59 raise IOError('Failed to read units from trajectory. Please specify units.') 

60 

61 else: # No units from user but units from trajectory 

62 def convert_units(ts): 

63 length_scaling = mda.units.get_conversion_factor('length', 

64 trajectory_length_unit, 

65 'Angstrom') 

66 time_scaling = mda.units.get_conversion_factor('time', 

67 trajectory_time_unit, 

68 'fs') 

69 ts.positions *= length_scaling 

70 ts.triclinic_dimensions *= length_scaling 

71 if ts.has_velocities: 

72 ts.velocities *= length_scaling/time_scaling 

73 return ts 

74 self._trajectory.add_transformations(convert_units) 

75 

76 else: # Units from user, even if trajectory has units 

77 if (length_unit != trajectory_length_unit and trajectory_length_unit is not None) or \ 

78 (time_unit != trajectory_time_unit and trajectory_time_unit is not None): 

79 logger.warning(f'The units {length_unit} and {time_unit} were specified by user ' 

80 f'but the units {trajectory_length_unit} and {trajectory_time_unit} ' 

81 'were read from trajectory. Disregarding units read from trajectory ' 

82 f'and using the user-specified {length_unit} and {time_unit}.') 

83 

84 def convert_units(ts): 

85 length_scaling = mda.units.get_conversion_factor('length', length_unit, 'Angstrom') 

86 time_scaling = mda.units.get_conversion_factor('time', time_unit, 'fs') 

87 ts.positions *= length_scaling 

88 ts.triclinic_dimensions *= length_scaling 

89 with warnings.catch_warnings(): 

90 warnings.filterwarnings('ignore', category=UserWarning, 

91 message='Reader has no dt information, set to 1.0 ps') 

92 if ts.has_velocities: 

93 ts.velocities *= length_scaling/time_scaling 

94 return ts 

95 self._trajectory.add_transformations(convert_units) 

96 

97 def _get_next(self): 

98 if self._first_called: 

99 self._trajectory.next() 

100 else: 

101 self._first_called = True 

102 self._positions = self._trajectory.ts.positions 

103 self._cell = self._trajectory.ts.triclinic_dimensions 

104 self._n_atoms = self._trajectory.ts.n_atoms 

105 if self._trajectory.ts.has_velocities: 

106 self._velocities = self._trajectory.ts.velocities 

107 else: 

108 self._velocities = None 

109 

110 def __iter__(self): 

111 """ Iterates through the trajectory file, frame by frame. """ 

112 return self 

113 

114 def __next__(self): 

115 """ Gets next trajectory frame. """ 

116 if not self._open: 

117 raise StopIteration 

118 

119 self._get_next() 

120 

121 if self._velocities is not None: 

122 frame = ReaderFrame(frame_index=next(self._frame_index), 

123 cell=self._cell, 

124 n_atoms=self._n_atoms, 

125 positions=self._positions.copy(), 

126 velocities=self._velocities.copy(), 

127 atom_types=self._atom_types 

128 ) 

129 else: 

130 frame = ReaderFrame(frame_index=next(self._frame_index), 

131 cell=self._cell, 

132 n_atoms=self._n_atoms, 

133 positions=self._positions.copy(), 

134 atom_types=self._atom_types 

135 ) 

136 

137 return frame 

138 

139 def close(self): 

140 """ Closes down, release resources etc. """ 

141 if self._open: 

142 self._trajectory.close() 

143 self._open = False