Coverage for local_installation/dynasor/trajectory/mdanalysis_trajectory_reader.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-08-05 09:53 +0000

1from itertools import count 

2from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

3from dynasor.trajectory.trajectory_frame import ReaderFrame 

4from dynasor.logging_tools import logger 

5import MDAnalysis as mda 

6import warnings 

7 

8 

9warnings.filterwarnings('error', category=UserWarning) 

10 

11 

12class MDAnalysisTrajectoryReader(AbstractTrajectoryReader): 

13 """ Read a trajectory using the MDAnalysis Python library. 

14 

15 Parameters 

16 ---------- 

17 filename 

18 Name of input file. 

19 trajectory_format 

20 Type of trajectory. See MDAnalysis for the available formats. 

21 length_unit 

22 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

23 time_unit 

24 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

25 """ 

26 

27 def __init__(self, 

28 filename: str, 

29 trajectory_format: str, 

30 length_unit: str = 'Angstrom', 

31 time_unit: str = 'fs'): 

32 

33 self._open = True 

34 self._first_called = False 

35 

36 with warnings.catch_warnings(): 

37 warnings.filterwarnings('ignore', category=UserWarning, 

38 message='Guessed all Masses to 1.0') 

39 u = mda.Universe(filename, format=trajectory_format, convert_units=False) 

40 self._frame_index = count(0) 

41 self._trajectory = u.trajectory 

42 

43 # Set atomic_indices dict, if possible 

44 try: 

45 self._atom_types = u.atoms.types 

46 except mda.exceptions.NoDataError: 

47 self._atom_types = None 

48 

49 trajectory_length_unit = self._trajectory.units['length'] 

50 trajectory_time_unit = self._trajectory.units['time'] 

51 

52 if trajectory_length_unit is None or trajectory_time_unit is None: # No units from traj... 

53 if length_unit not in self.lengthunits_to_nm_table \ 

54 or time_unit not in self.timeunits_to_fs_table: # ... and incorrect units from user 

55 raise ValueError('Trajectory contains no unit information and specified units not ' 

56 'recognized. Please check the available units.') 

57 

58 else: # ... but correct units from user 

59 def convert_units(ts): 

60 length_scaling = mda.units.get_conversion_factor('length', 

61 length_unit, 

62 'Angstrom') 

63 time_scaling = mda.units.get_conversion_factor('time', 

64 time_unit, 

65 'fs') 

66 ts.positions *= length_scaling 

67 ts.triclinic_dimensions *= length_scaling 

68 if ts.has_velocities: 

69 ts.velocities *= length_scaling/time_scaling 

70 return ts 

71 self._trajectory.add_transformations(convert_units) 

72 

73 else: # Units from trajectory 

74 if (length_unit != trajectory_length_unit and trajectory_length_unit is not None) or \ 

75 (time_unit != trajectory_time_unit and trajectory_time_unit is not None): 

76 logger.warning(f'The units {length_unit} and {time_unit} were specified by user ' 

77 f'but the units {trajectory_length_unit} and {trajectory_time_unit} ' 

78 'were read from trajectory. Disregarding user-specified units and ' 

79 f'using {trajectory_length_unit} and {trajectory_time_unit}.') 

80 

81 def convert_units(ts): 

82 length_scaling = mda.units.get_conversion_factor('length', trajectory_length_unit, 

83 'Angstrom') 

84 time_scaling = mda.units.get_conversion_factor('time', trajectory_time_unit, 'fs') 

85 ts.positions *= length_scaling 

86 ts.triclinic_dimensions *= length_scaling 

87 with warnings.catch_warnings(): 

88 warnings.filterwarnings('ignore', category=UserWarning, 

89 message='Reader has no dt information, set to 1.0 ps') 

90 if ts.has_velocities: 

91 ts.velocities *= length_scaling/time_scaling 

92 return ts 

93 self._trajectory.add_transformations(convert_units) 

94 

95 def _get_next(self): 

96 if self._first_called: 

97 self._trajectory.next() 

98 else: 

99 self._first_called = True 

100 self._positions = self._trajectory.ts.positions 

101 self._cell = self._trajectory.ts.triclinic_dimensions 

102 self._n_atoms = self._trajectory.ts.n_atoms 

103 if self._trajectory.ts.has_velocities: 

104 self._velocities = self._trajectory.ts.velocities 

105 else: 

106 self._velocities = None 

107 

108 def __iter__(self): 

109 """ Iterates through the trajectory file, frame by frame. """ 

110 return self 

111 

112 def __next__(self): 

113 """ Gets next trajectory frame. """ 

114 if not self._open: 

115 raise StopIteration 

116 

117 self._get_next() 

118 

119 if self._velocities is not None: 

120 frame = ReaderFrame(frame_index=next(self._frame_index), 

121 cell=self._cell, 

122 n_atoms=self._n_atoms, 

123 positions=self._positions.copy(), 

124 velocities=self._velocities.copy(), 

125 atom_types=self._atom_types 

126 ) 

127 else: 

128 frame = ReaderFrame(frame_index=next(self._frame_index), 

129 cell=self._cell, 

130 n_atoms=self._n_atoms, 

131 positions=self._positions.copy(), 

132 atom_types=self._atom_types 

133 ) 

134 

135 return frame 

136 

137 def close(self): 

138 """ Closes down, release resources etc. """ 

139 if self._open: 

140 self._trajectory.close() 

141 self._open = False