Coverage for local_installation/dynasor/trajectory/lammps_trajectory_reader.py: 77%

123 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-06-13 16:00 +0000

1import numpy as np 

2import re 

3 

4from collections import deque 

5from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

6from dynasor.trajectory.trajectory_frame import ReaderFrame 

7from itertools import count 

8from numpy import array, arange, zeros 

9 

10 

11class LammpsTrajectoryReader(AbstractTrajectoryReader): 

12 """Read LAMMPS trajectory file 

13 

14 This is a naive (and comparatively slow) implementation, 

15 written entirely in python. 

16 

17 Parameters 

18 ---------- 

19 filename 

20 Name of input file. 

21 length_unit 

22 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

23 time_unit 

24 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

25 """ 

26 

27 def __init__( 

28 self, 

29 filename: str, 

30 length_unit: str = 'Angstrom', 

31 time_unit: str = 'fs' 

32 ): 

33 

34 if filename.endswith('.gz'): 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true

35 import gzip 

36 self._fh = gzip.open(filename, 'rt') 

37 elif filename.endswith('.bz2'): 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 import bz2 

39 self._fh = bz2.open(filename, 'rt') 

40 else: 

41 self._fh = open(filename, 'r') 

42 

43 self._open = True 

44 regexp = r'^ITEM: (TIMESTEP|NUMBER OF ATOMS|BOX BOUNDS|ATOMS) ?(.*)$' 

45 self._item_re = re.compile(regexp) 

46 

47 self._first_called = False 

48 self._frame_index = count(0) 

49 

50 # setup units 

51 if length_unit not in self.lengthunits_to_nm_table: 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true

52 raise ValueError(f'Specified length unit {length_unit} is not an available option.') 

53 else: 

54 self.x_factor = self.lengthunits_to_nm_table[length_unit] 

55 if time_unit not in self.timeunits_to_fs_table: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true

56 raise ValueError(f'Specified time unit {time_unit} is not an available option.') 

57 else: 

58 self.t_factor = self.timeunits_to_fs_table[time_unit] 

59 self.v_factor = self.x_factor / self.t_factor 

60 

61 # ITEM: TIMESTEP 

62 # 81000 

63 # ITEM: NUMBER OF ATOMS 

64 # 1536 

65 # ITEM: BOX BOUNDS pp pp pp 

66 # 1.54223 26.5378 

67 # 1.54223 26.5378 

68 # 1.54223 26.5378 

69 # ITEM: ATOMS id type x y z vx vy vz 

70 # 247 1 3.69544 2.56202 3.27701 0.00433856 -0.00099307 -0.00486166 

71 # 249 2 3.73324 3.05962 4.14359 0.00346029 0.00332502 -0.00731005 

72 # 463 1 3.5465 4.12841 5.34888 0.000523332 0.00145597 -0.00418675 

73 

74 def _read_frame_header(self): 

75 while True: 

76 L = self._fh.readline() 

77 m = self._item_re.match(L) 

78 if not m: 

79 if L == '': 79 ↛ 83line 79 didn't jump to line 83, because the condition on line 79 was never false

80 self._fh.close() 

81 self._open = False 

82 raise StopIteration 

83 if L.strip() == '': 

84 continue 

85 raise IOError('TRJ_reader: Failed to parse TRJ frame header') 

86 if m.group(1) == 'TIMESTEP': 

87 step = int(self._fh.readline()) 

88 elif m.group(1) == 'NUMBER OF ATOMS': 

89 n_atoms = int(self._fh.readline()) 

90 elif m.group(1) == 'BOX BOUNDS': 

91 bbounds = [deque(map(float, self._fh.readline().split())) 

92 for _ in range(3)] 

93 x = array(bbounds) 

94 cell = np.diag(x[:, 1] - x[:, 0]) 

95 if x.shape == (3, 3): 95 ↛ 96line 95 didn't jump to line 96, because the condition on line 95 was never true

96 cell[1, 0] = x[0, 2] 

97 cell[2, 0] = x[1, 2] 

98 cell[2, 1] = x[2, 2] 

99 elif x.shape != (3, 2): 99 ↛ 100line 99 didn't jump to line 100, because the condition on line 99 was never true

100 raise IOError('TRJ_reader: Malformed cell bounds') 

101 elif m.group(1) == 'ATOMS': 101 ↛ 76line 101 didn't jump to line 76, because the condition on line 101 was never false

102 cols = tuple(m.group(2).split()) 

103 # At this point, there should be only atomic data left 

104 return (step, n_atoms, cell, cols) 

105 

106 def _get_first(self): 

107 # Read first frame, update state of self, create indeces etc 

108 step, N, cell, cols = self._read_frame_header() 

109 self._n_atoms = N 

110 self._step = step 

111 self._cols = cols 

112 self._cell = cell 

113 

114 def _all_in_cols(keys): 

115 for k in keys: 

116 if k not in cols: 

117 return False 

118 return True 

119 

120 self._x_map = None 

121 if _all_in_cols(('id', 'xu', 'yu', 'zu')): 121 ↛ 122line 121 didn't jump to line 122, because the condition on line 121 was never true

122 self._x_I = array(deque(map(cols.index, ('xu', 'yu', 'zu')))) 

123 elif _all_in_cols(('id', 'x', 'y', 'z')): 123 ↛ 125line 123 didn't jump to line 125, because the condition on line 123 was never false

124 self._x_I = array(deque(map(cols.index, ('x', 'y', 'z')))) 

125 elif _all_in_cols(('id', 'xs', 'ys', 'zs')): 

126 self._x_I = array(deque(map(cols.index, ('xs', 'ys', 'zs')))) 

127 _x_factor = self._cell.diagonal() 

128 # xs.shape == (n, 3) 

129 self._x_map = lambda xs: xs * _x_factor 

130 else: 

131 raise RuntimeError('TRJ file must contain at least atom-id, x, y,' 

132 ' and z coordinates to be useful.') 

133 self._id_I = cols.index('id') 

134 

135 if _all_in_cols(('vx', 'vy', 'vz')): 

136 self._v_I = array(deque(map(cols.index, ('vx', 'vy', 'vz')))) 

137 else: 

138 self._v_I = None 

139 

140 if 'type' in cols: 140 ↛ 143line 140 didn't jump to line 143, because the condition on line 140 was never false

141 self._type_I = cols.index('type') 

142 else: 

143 self._type_I = None 

144 

145 data = array([list(map(float, self._fh.readline().split())) for _ in range(N)]) 

146 # data.shape == (N, Ncols) 

147 II = np.asarray(data[:, self._id_I], dtype=np.int_) 

148 # Unless dump is done for group 'all' ... 

149 II[np.argsort(II)] = arange(len(II)) 

150 self._x = zeros((N, 3)) 

151 if self._x_map is None: 151 ↛ 154line 151 didn't jump to line 154, because the condition on line 151 was never false

152 self._x[II] = data[:, self._x_I] 

153 else: 

154 self._x[II] = self._x_map(data[:, self._x_I]) 

155 if self._v_I is not None: 

156 self._v = zeros((N, 3)) 

157 self._v[II] = data[:, self._v_I] 

158 

159 self._first_called = True 

160 

161 def _get_next(self): 

162 # get next frame, update state of self 

163 step, N, cell, cols = self._read_frame_header() 

164 assert self._n_atoms == N 

165 assert self._cols == cols 

166 self._step = step 

167 self._cell = cell 

168 

169 data = array([deque(map(float, self._fh.readline().split())) 

170 for _ in range(N)]) 

171 II = np.asarray(data[:, self._id_I], dtype=np.int_) - 1 

172 if self._x_map is None: 172 ↛ 175line 172 didn't jump to line 175, because the condition on line 172 was never false

173 self._x[II] = data[:, self._x_I] 

174 else: 

175 self._x[II] = self._x_map(data[:, self._x_I]) 

176 if self._v_I is not None: 

177 self._v[II] = data[:, self._v_I] 

178 

179 def __iter__(self): 

180 return self 

181 

182 def close(self): 

183 if not self._fh.closed: 

184 self._fh.close() 

185 

186 def __next__(self): 

187 if not self._open: 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true

188 raise StopIteration 

189 

190 if self._first_called: 

191 self._get_next() 

192 else: 

193 self._get_first() 

194 

195 if self._v_I is not None: 

196 frame = ReaderFrame(frame_index=next(self._frame_index), 

197 n_atoms=int(self._n_atoms), 

198 cell=self.x_factor * self._cell.copy('F'), 

199 positions=self.x_factor * self._x, 

200 velocities=self.v_factor * self._v 

201 ) 

202 else: 

203 frame = ReaderFrame(frame_index=next(self._frame_index), 

204 n_atoms=int(self._n_atoms), 

205 cell=self.x_factor * self._cell.copy('F'), 

206 positions=self.x_factor * self._x 

207 ) 

208 

209 return frame