Coverage for local_installation/dynasor/trajectory/trajectory.py: 94%

158 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2025-04-16 06:13 +0000

1__all__ = ['Trajectory', 'WindowIterator'] 

2 

3import numpy as np 

4 

5from collections import deque 

6from itertools import islice, chain 

7from os.path import isfile 

8from typing import Callable, Dict, Union, List 

9 

10from dynasor.trajectory.atomic_indices import parse_gromacs_index_file 

11from dynasor.trajectory.ase_trajectory_reader import ASETrajectoryReader 

12from dynasor.trajectory.extxyz_trajectory_reader import ExtxyzTrajectoryReader 

13from dynasor.trajectory.lammps_trajectory_reader import LammpsTrajectoryReader 

14from dynasor.trajectory.mdanalysis_trajectory_reader import MDAnalysisTrajectoryReader 

15from dynasor.trajectory.trajectory_frame import TrajectoryFrame 

16from dynasor.logging_tools import logger 

17 

18 

19class Trajectory: 

20 """Instances of this class hold trajectories in a format suitable for 

21 the computation of correlation functions. They behave as 

22 iterators, where each step returns the next frame as a 

23 :class:`TrajectoryFrame` object. The latter hold information 

24 regarding atomic positions, types, and velocities. 

25 

26 Parameters 

27 ---------- 

28 filename 

29 Name of input file 

30 trajectory_format 

31 Type of trajectory. Possible values are: 

32 ``'lammps_internal'``, ``'extxyz'``, ``'ase'`` or one of the formats supported by 

33 `MDAnalysis <https://www.mdanalysis.org/>`_ (except for ``'lammpsdump'``, 

34 which can be called by specifying ``'lammps_mdanalysis'`` to avoid ambiguity) 

35 atomic_indices 

36 Specify which indices belong to which atom type. Can be 

37 (1) a dictionary where the keys specicy species and the values are list of atomic indices, 

38 (2) ``'read_from_trajectory'``, in which case the species are read from the trajectory or 

39 (3) the path to an index file. 

40 length_unit 

41 Length unit of trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). Necessary for 

42 correct conversion to internal dynasor units if the trajectory file does not contain unit 

43 information. 

44 time_unit 

45 Time unit of trajectory (``'fs'``, ``'ps'``, ``'ns'``). Necessary for correct conversion to 

46 internal dynasor units if the trajectory file does not contain unit information. 

47 frame_start 

48 First frame to read; must be larger or equal ``0``. 

49 frame_stop 

50 Last frame to read. By default (``None``) the entire trajectory is read. 

51 frame_step 

52 Read every :attr:`frame_step`-th step of the input trajectory. 

53 By default (``1``) every frame is read. Must be larger than ``0``. 

54 

55 """ 

56 def __init__( 

57 self, 

58 filename: str, 

59 trajectory_format: str, 

60 atomic_indices: Union[str, Dict[str, List[int]]] = None, 

61 length_unit: str = 'Angstrom', 

62 time_unit: str = 'fs', 

63 frame_start: int = 0, 

64 frame_stop: int = None, 

65 frame_step: int = 1 

66 ): 

67 

68 if frame_start < 0: 

69 raise ValueError('frame_start should be positive') 

70 if frame_step < 0: 

71 raise ValueError('frame_step should be positive') 

72 

73 self._frame_start = frame_start 

74 self._frame_step = frame_step 

75 self._frame_stop = frame_stop 

76 

77 # setup trajectory reader 

78 if not isfile(filename): 

79 raise IOError(f'File {filename} does not exist') 

80 self._filename = filename 

81 

82 if trajectory_format == 'lammps_internal': 

83 reader = LammpsTrajectoryReader 

84 elif trajectory_format == 'extxyz': 

85 reader = ExtxyzTrajectoryReader 

86 elif trajectory_format == 'lammps_mdanalysis': 

87 reader = MDAnalysisTrajectoryReader 

88 trajectory_format = 'lammpsdump' 

89 elif trajectory_format == 'ase': 89 ↛ 90line 89 didn't jump to line 90, because the condition on line 89 was never true

90 reader = ASETrajectoryReader 

91 elif trajectory_format == 'lammps': 

92 raise IOError('Ambiguous trajectory format, ' 

93 'did you mean lammps_internal or lammps_mdanalysis?') 

94 else: 

95 reader = MDAnalysisTrajectoryReader 

96 

97 logger.debug(f'Using trajectory reader: {reader.__name__}') 

98 if reader == MDAnalysisTrajectoryReader: 

99 self._reader_obj = reader(self._filename, trajectory_format, 

100 length_unit=length_unit, time_unit=time_unit) 

101 else: 

102 self._reader_obj = reader(self._filename, length_unit=length_unit, time_unit=time_unit) 

103 

104 # Get two frames to set cell etc. 

105 frame0 = next(self._reader_obj) 

106 frame1 = next(self._reader_obj) 

107 self._cell = frame0.cell 

108 self._n_atoms = frame0.n_atoms 

109 

110 # Make sure cell is not changed during consecutive frames 

111 if not np.allclose(frame0.cell, frame1.cell): 

112 raise ValueError('The cell changes between the first and second frame. ' 

113 'The concept of q-points becomes muddy if the simulation cell is ' 

114 'changing, such as during NPT MD simulations, so trajectories where ' 

115 'the cell changes are not supported by dynasor.') 

116 

117 # setup iterator slice (reuse frame0 and frame1 via chain) 

118 self.number_of_frames_read = 0 

119 self.current_frame_index = 0 

120 self._reader_iter = islice(chain([frame0, frame1], self._reader_obj), 

121 self._frame_start, self._frame_stop, self._frame_step) 

122 

123 # setup atomic indices 

124 if atomic_indices is None: # Default behaviour 

125 atomic_indices = {'X': np.arange(0, self.n_atoms)} 

126 elif isinstance(atomic_indices, str): # Str input 

127 if atomic_indices == 'read_from_trajectory': 

128 if frame0.atom_types is None: 

129 raise ValueError('Could not read atomic indices from the trajectory.') 

130 else: 

131 uniques = np.unique(frame0.atom_types) 

132 atomic_indices = {str(uniques[i]): 

133 (frame0.atom_types == uniques[i]).nonzero()[0] 

134 for i in range(len(uniques))} 

135 else: 

136 atomic_indices = parse_gromacs_index_file(atomic_indices) 

137 elif isinstance(atomic_indices, dict): # Dict input 

138 pass 

139 else: 

140 raise ValueError('Could not understand atomic_indices.') 

141 self._atomic_indices = atomic_indices 

142 

143 # sanity checks for atomic_indices 

144 for key, indices in self._atomic_indices.items(): 

145 if np.max(indices) > self.n_atoms: 

146 raise ValueError('maximum index in atomic_indices exceeds number of atoms') 

147 if np.min(indices) < 0: 

148 raise ValueError('minimum index in atomic_indices is negative') 

149 if '_' in key: 

150 # Since '_' is what we use to distinguish atom types in the results, e.g. Sqw_Cs_Pb 

151 raise ValueError('The char "_" is not allowed in atomic_indices.') 

152 

153 # log info on trajectory and atom types etc 

154 logger.info(f'Trajectory file: {self.filename}') 

155 logger.info(f'Total number of particles: {self.n_atoms}') 

156 logger.info(f'Number of atom types: {len(self.atom_types)}') 

157 for atom_type, indices in self._atomic_indices.items(): 

158 logger.info(f'Number of atoms of type {atom_type}: {len(indices)}') 

159 logger.info(f'Simulation cell (in Angstrom):\n{str(self._cell)}') 

160 

161 def __iter__(self): 

162 return self 

163 

164 def __next__(self): 

165 frame = next(self._reader_iter) 

166 new_frame = TrajectoryFrame(self.atomic_indices, frame.frame_index, frame.positions, 

167 frame.velocities) 

168 self.number_of_frames_read += 1 

169 self.current_frame_index = frame.frame_index 

170 return new_frame 

171 

172 def __str__(self) -> str: 

173 s = ['Trajectory'] 

174 s += ['{:12} : {}'.format('filename', self.filename)] 

175 s += ['{:12} : {}'.format('natoms', self.n_atoms)] 

176 s += ['{:12} : {}'.format('frame_start', self._frame_start)] 

177 s += ['{:12} : {}'.format('frame_stop', self._frame_stop)] 

178 s += ['{:12} : {}'.format('frame_step', self.frame_step)] 

179 s += ['{:12} : {}'.format('frame_index', self.current_frame_index)] 

180 s += ['{:12} : [{}\n {}\n {}]' 

181 .format('cell', self.cell[0], self.cell[1], self.cell[2])] 

182 return '\n'.join(s) 

183 

184 def __repr__(self) -> str: 

185 return str(self) 

186 

187 def _repr_html_(self) -> str: 

188 s = [f'<h3>{self.__class__.__name__}</h3>'] 

189 s += ['<table border="1" class="dataframe">'] 

190 s += ['<thead><tr><th style="text-align: left;">Field</th><th>Value</th></tr></thead>'] 

191 s += ['<tbody>'] 

192 s += [f'<tr"><td style="text-align: left;">File name</td><td>{self.filename}</td></tr>'] 

193 s += [f'<tr><td style="text-align: left;">Number of atoms</td><td>{self.n_atoms}</td></tr>'] 

194 s += [f'<tr><td style="text-align: left;">Cell metric</td><td>{self.cell}</td></tr>'] 

195 s += [f'<tr><td style="text-align: left;">Frame step</td><td>{self.frame_step}</td></tr>'] 

196 s += [f'<tr><td style="text-align: left;">Atom types</td><td>{self.atom_types}</td></tr>'] 

197 s += ['</tbody>'] 

198 s += ['</table>'] 

199 return '\n'.join(s) 

200 

201 @property 

202 def cell(self): 

203 """ Simulation cell """ 

204 return self._cell 

205 

206 @property 

207 def n_atoms(self): 

208 """ Number of atoms """ 

209 return self._n_atoms 

210 

211 @property 

212 def filename(self): 

213 """ The trajectory filename """ 

214 return self._filename 

215 

216 @property 

217 def atomic_indices(self): 

218 """ Return copy of index arrays """ 

219 atomic_indices = dict() 

220 for name, inds in self._atomic_indices.items(): 

221 atomic_indices[name] = inds.copy() 

222 return atomic_indices 

223 

224 @property 

225 def atom_types(self) -> List[str]: 

226 return sorted(self._atomic_indices.keys()) 

227 

228 @property 

229 def frame_step(self): 

230 """ Frame to access, trajectory will return every :attr:`frame_step`-th snapshot """ 

231 return self._frame_step 

232 

233 

234def consume(iterator, n): 

235 """ Advance the iterator by :attr:`n` steps. If :attr:`n` is ``None``, consume entirely. """ 

236 # From the python.org 

237 if n is None: 237 ↛ 238line 237 didn't jump to line 238, because the condition on line 237 was never true

238 deque(iterator, maxlen=0) 

239 else: 

240 next(islice(iterator, n, n), None) 

241 

242 

243class WindowIterator: 

244 """Sliding window iterator. 

245 

246 Returns consecutive windows (a window is represented as a list 

247 of objects), created from an input iterator. 

248 

249 Parameters 

250 ---------- 

251 itraj 

252 Trajectory object 

253 width 

254 Length of window (``window_size`` + 1) 

255 window_step 

256 Distance between the start of two consecutive window frames 

257 element_processor 

258 Enables processing each non-discarded object; useful if ``window_step > 

259 width`` and ``map_item`` is expensive (as compared to directly passing 

260 ``map(fun, itraj)`` as ``itraj``); if ``window_step < width``, you could as 

261 well directly pass ``map(fun, itraj)``. 

262 """ 

263 def __init__(self, 

264 itraj: Trajectory, 

265 width: int, 

266 window_step: int = 1, 

267 element_processor: Callable = None): 

268 

269 self._raw_it = itraj 

270 if element_processor: 

271 self._it = map(element_processor, self._raw_it) 

272 else: 

273 self._it = self._raw_it 

274 assert window_step >= 1 

275 assert width >= 1 

276 self.width = width 

277 self.window_step = window_step 

278 self._window = None 

279 

280 def __iter__(self): 

281 return self 

282 

283 def __next__(self): 

284 """ Returns next element in sequence. """ 

285 if self._window is None: 

286 self._window = deque(islice(self._it, self.width), self.width) 

287 else: 

288 if self.window_step >= self.width: 

289 self._window.clear() 

290 consume(self._raw_it, self.window_step - self.width) 

291 else: 

292 for _ in range(min((self.window_step, len(self._window)))): 

293 self._window.popleft() 

294 for f in islice(self._it, min((self.window_step, self.width))): 

295 self._window.append(f) 

296 

297 if len(self._window) == 0: 

298 raise StopIteration 

299 

300 return list(self._window)