Coverage for dynasor / trajectory / lammps_trajectory_reader.py: 82%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 12:31 +0000

1import numpy as np 

2import re 

3 

4from collections import deque 

5from typing import Optional 

6from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

7from dynasor.trajectory.trajectory_frame import ReaderFrame 

8from itertools import count 

9from numpy import array, arange, zeros 

10 

11 

12class LammpsTrajectoryReader(AbstractTrajectoryReader): 

13 """Read LAMMPS trajectory file 

14 

15 This is a naive (and comparatively slow) implementation, 

16 written entirely in python. 

17 

18 Parameters 

19 ---------- 

20 filename 

21 Name of input file. 

22 length_unit 

23 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

24 time_unit 

25 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

26 """ 

27 

28 def __init__( 

29 self, 

30 filename: str, 

31 length_unit: Optional[str] = None, 

32 time_unit: Optional[str] = None 

33 ): 

34 

35 if filename.endswith('.gz'): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 import gzip 

37 self._fh = gzip.open(filename, 'rt') 

38 elif filename.endswith('.bz2'): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 import bz2 

40 self._fh = bz2.open(filename, 'rt') 

41 else: 

42 self._fh = open(filename, 'r') 

43 

44 self._open = True 

45 regexp = r'^ITEM: (TIMESTEP|NUMBER OF ATOMS|BOX BOUNDS|ATOMS) ?(.*)$' 

46 self._item_re = re.compile(regexp) 

47 

48 self._first_called = False 

49 self._frame_index = count(0) 

50 

51 # set up units 

52 self.set_unit_scaling_factors(length_unit, time_unit) 

53 

54 # ITEM: TIMESTEP 

55 # 81000 

56 # ITEM: NUMBER OF ATOMS 

57 # 1536 

58 # ITEM: BOX BOUNDS pp pp pp 

59 # 1.54223 26.5378 

60 # 1.54223 26.5378 

61 # 1.54223 26.5378 

62 # ITEM: ATOMS id type x y z vx vy vz 

63 # 247 1 3.69544 2.56202 3.27701 0.00433856 -0.00099307 -0.00486166 

64 # 249 2 3.73324 3.05962 4.14359 0.00346029 0.00332502 -0.00731005 

65 # 463 1 3.5465 4.12841 5.34888 0.000523332 0.00145597 -0.00418675 

66 

67 def _read_frame_header(self): 

68 while True: 

69 L = self._fh.readline() 

70 m = self._item_re.match(L) 

71 if not m: 

72 if L == '': 

73 self._fh.close() 

74 self._open = False 

75 raise StopIteration 

76 if L.strip() == '': 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 continue 

78 raise IOError('TRJ_reader: Failed to parse TRJ frame header') 

79 if m.group(1) == 'TIMESTEP': 

80 step = int(self._fh.readline()) 

81 elif m.group(1) == 'NUMBER OF ATOMS': 

82 n_atoms = int(self._fh.readline()) 

83 elif m.group(1) == 'BOX BOUNDS': 

84 bbounds = [deque(map(float, self._fh.readline().split())) 

85 for _ in range(3)] 

86 x = array(bbounds) 

87 cell = np.diag(x[:, 1] - x[:, 0]) 

88 if x.shape == (3, 3): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 cell[1, 0] = x[0, 2] 

90 cell[2, 0] = x[1, 2] 

91 cell[2, 1] = x[2, 2] 

92 elif x.shape != (3, 2): 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 raise IOError('TRJ_reader: Malformed cell bounds') 

94 elif m.group(1) == 'ATOMS': 94 ↛ 68line 94 didn't jump to line 68 because the condition on line 94 was always true

95 cols = tuple(m.group(2).split()) 

96 # At this point, there should be only atomic data left 

97 return (step, n_atoms, cell, cols) 

98 

99 def _get_first(self): 

100 # Read first frame, update state of self, create indeces etc 

101 step, N, cell, cols = self._read_frame_header() 

102 self._n_atoms = N 

103 self._step = step 

104 self._cols = cols 

105 self._cell = cell 

106 

107 def _all_in_cols(keys): 

108 for k in keys: 

109 if k not in cols: 

110 return False 

111 return True 

112 

113 self._x_map = None 

114 if _all_in_cols(('id', 'xu', 'yu', 'zu')): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 self._x_I = array(deque(map(cols.index, ('xu', 'yu', 'zu')))) 

116 elif _all_in_cols(('id', 'x', 'y', 'z')): 116 ↛ 118line 116 didn't jump to line 118 because the condition on line 116 was always true

117 self._x_I = array(deque(map(cols.index, ('x', 'y', 'z')))) 

118 elif _all_in_cols(('id', 'xs', 'ys', 'zs')): 

119 self._x_I = array(deque(map(cols.index, ('xs', 'ys', 'zs')))) 

120 _x_factor = self._cell.diagonal() 

121 # xs.shape == (n, 3) 

122 self._x_map = lambda xs: xs * _x_factor 

123 else: 

124 raise RuntimeError('TRJ file must contain at least atom-id, x, y,' 

125 ' and z coordinates to be useful.') 

126 self._id_I = cols.index('id') 

127 

128 if _all_in_cols(('vx', 'vy', 'vz')): 

129 self._v_I = array(deque(map(cols.index, ('vx', 'vy', 'vz')))) 

130 else: 

131 self._v_I = None 

132 

133 if 'type' in cols: 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true

134 self._type_I = cols.index('type') 

135 else: 

136 self._type_I = None 

137 

138 data = array([list(map(float, self._fh.readline().split())) for _ in range(N)]) 

139 # data.shape == (N, Ncols) 

140 II = np.asarray(data[:, self._id_I], dtype=np.int_) 

141 # Unless dump is done for group 'all' ... 

142 II[np.argsort(II)] = arange(len(II)) 

143 self._x = zeros((N, 3)) 

144 if self._x_map is None: 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true

145 self._x[II] = data[:, self._x_I] 

146 else: 

147 self._x[II] = self._x_map(data[:, self._x_I]) 

148 if self._v_I is not None: 

149 self._v = zeros((N, 3)) 

150 self._v[II] = data[:, self._v_I] 

151 

152 self._first_called = True 

153 

154 def _get_next(self): 

155 # get next frame, update state of self 

156 step, N, cell, cols = self._read_frame_header() 

157 assert self._n_atoms == N 

158 assert self._cols == cols 

159 self._step = step 

160 self._cell = cell 

161 

162 data = array([deque(map(float, self._fh.readline().split())) 

163 for _ in range(N)]) 

164 II = np.asarray(data[:, self._id_I], dtype=np.int_) - 1 

165 if self._x_map is None: 165 ↛ 168line 165 didn't jump to line 168 because the condition on line 165 was always true

166 self._x[II] = data[:, self._x_I] 

167 else: 

168 self._x[II] = self._x_map(data[:, self._x_I]) 

169 if self._v_I is not None: 

170 self._v[II] = data[:, self._v_I] 

171 

172 def __iter__(self): 

173 return self 

174 

175 def close(self): 

176 if not self._fh.closed: 

177 self._fh.close() 

178 

179 def __next__(self): 

180 if not self._open: 

181 raise StopIteration 

182 

183 if self._first_called: 

184 self._get_next() 

185 else: 

186 self._get_first() 

187 

188 if self._v_I is not None: 

189 frame = ReaderFrame(frame_index=next(self._frame_index), 

190 n_atoms=int(self._n_atoms), 

191 cell=self.x_factor * self._cell.copy('F'), 

192 positions=self.x_factor * self._x, 

193 velocities=self.v_factor * self._v 

194 ) 

195 else: 

196 frame = ReaderFrame(frame_index=next(self._frame_index), 

197 n_atoms=int(self._n_atoms), 

198 cell=self.x_factor * self._cell.copy('F'), 

199 positions=self.x_factor * self._x 

200 ) 

201 

202 return frame