Coverage for dynasor / trajectory / lammps_trajectory_reader.py: 82%
119 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
1import numpy as np
2import re
4from collections import deque
5from typing import Optional
6from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
7from dynasor.trajectory.trajectory_frame import ReaderFrame
8from itertools import count
9from numpy import array, arange, zeros
12class LammpsTrajectoryReader(AbstractTrajectoryReader):
13 """Read LAMMPS trajectory file
15 This is a naive (and comparatively slow) implementation,
16 written entirely in python.
18 Parameters
19 ----------
20 filename
21 Name of input file.
22 length_unit
23 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
24 time_unit
25 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
26 """
28 def __init__(
29 self,
30 filename: str,
31 length_unit: Optional[str] = None,
32 time_unit: Optional[str] = None
33 ):
35 if filename.endswith('.gz'): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 import gzip
37 self._fh = gzip.open(filename, 'rt')
38 elif filename.endswith('.bz2'): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 import bz2
40 self._fh = bz2.open(filename, 'rt')
41 else:
42 self._fh = open(filename, 'r')
44 self._open = True
45 regexp = r'^ITEM: (TIMESTEP|NUMBER OF ATOMS|BOX BOUNDS|ATOMS) ?(.*)$'
46 self._item_re = re.compile(regexp)
48 self._first_called = False
49 self._frame_index = count(0)
51 # set up units
52 self.set_unit_scaling_factors(length_unit, time_unit)
54 # ITEM: TIMESTEP
55 # 81000
56 # ITEM: NUMBER OF ATOMS
57 # 1536
58 # ITEM: BOX BOUNDS pp pp pp
59 # 1.54223 26.5378
60 # 1.54223 26.5378
61 # 1.54223 26.5378
62 # ITEM: ATOMS id type x y z vx vy vz
63 # 247 1 3.69544 2.56202 3.27701 0.00433856 -0.00099307 -0.00486166
64 # 249 2 3.73324 3.05962 4.14359 0.00346029 0.00332502 -0.00731005
65 # 463 1 3.5465 4.12841 5.34888 0.000523332 0.00145597 -0.00418675
67 def _read_frame_header(self):
68 while True:
69 L = self._fh.readline()
70 m = self._item_re.match(L)
71 if not m:
72 if L == '':
73 self._fh.close()
74 self._open = False
75 raise StopIteration
76 if L.strip() == '': 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 continue
78 raise IOError('TRJ_reader: Failed to parse TRJ frame header')
79 if m.group(1) == 'TIMESTEP':
80 step = int(self._fh.readline())
81 elif m.group(1) == 'NUMBER OF ATOMS':
82 n_atoms = int(self._fh.readline())
83 elif m.group(1) == 'BOX BOUNDS':
84 bbounds = [deque(map(float, self._fh.readline().split()))
85 for _ in range(3)]
86 x = array(bbounds)
87 cell = np.diag(x[:, 1] - x[:, 0])
88 if x.shape == (3, 3): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 cell[1, 0] = x[0, 2]
90 cell[2, 0] = x[1, 2]
91 cell[2, 1] = x[2, 2]
92 elif x.shape != (3, 2): 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 raise IOError('TRJ_reader: Malformed cell bounds')
94 elif m.group(1) == 'ATOMS': 94 ↛ 68line 94 didn't jump to line 68 because the condition on line 94 was always true
95 cols = tuple(m.group(2).split())
96 # At this point, there should be only atomic data left
97 return (step, n_atoms, cell, cols)
99 def _get_first(self):
100 # Read first frame, update state of self, create indeces etc
101 step, N, cell, cols = self._read_frame_header()
102 self._n_atoms = N
103 self._step = step
104 self._cols = cols
105 self._cell = cell
107 def _all_in_cols(keys):
108 for k in keys:
109 if k not in cols:
110 return False
111 return True
113 self._x_map = None
114 if _all_in_cols(('id', 'xu', 'yu', 'zu')): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 self._x_I = array(deque(map(cols.index, ('xu', 'yu', 'zu'))))
116 elif _all_in_cols(('id', 'x', 'y', 'z')): 116 ↛ 118line 116 didn't jump to line 118 because the condition on line 116 was always true
117 self._x_I = array(deque(map(cols.index, ('x', 'y', 'z'))))
118 elif _all_in_cols(('id', 'xs', 'ys', 'zs')):
119 self._x_I = array(deque(map(cols.index, ('xs', 'ys', 'zs'))))
120 _x_factor = self._cell.diagonal()
121 # xs.shape == (n, 3)
122 self._x_map = lambda xs: xs * _x_factor
123 else:
124 raise RuntimeError('TRJ file must contain at least atom-id, x, y,'
125 ' and z coordinates to be useful.')
126 self._id_I = cols.index('id')
128 if _all_in_cols(('vx', 'vy', 'vz')):
129 self._v_I = array(deque(map(cols.index, ('vx', 'vy', 'vz'))))
130 else:
131 self._v_I = None
133 if 'type' in cols: 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true
134 self._type_I = cols.index('type')
135 else:
136 self._type_I = None
138 data = array([list(map(float, self._fh.readline().split())) for _ in range(N)])
139 # data.shape == (N, Ncols)
140 II = np.asarray(data[:, self._id_I], dtype=np.int_)
141 # Unless dump is done for group 'all' ...
142 II[np.argsort(II)] = arange(len(II))
143 self._x = zeros((N, 3))
144 if self._x_map is None: 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true
145 self._x[II] = data[:, self._x_I]
146 else:
147 self._x[II] = self._x_map(data[:, self._x_I])
148 if self._v_I is not None:
149 self._v = zeros((N, 3))
150 self._v[II] = data[:, self._v_I]
152 self._first_called = True
154 def _get_next(self):
155 # get next frame, update state of self
156 step, N, cell, cols = self._read_frame_header()
157 assert self._n_atoms == N
158 assert self._cols == cols
159 self._step = step
160 self._cell = cell
162 data = array([deque(map(float, self._fh.readline().split()))
163 for _ in range(N)])
164 II = np.asarray(data[:, self._id_I], dtype=np.int_) - 1
165 if self._x_map is None: 165 ↛ 168line 165 didn't jump to line 168 because the condition on line 165 was always true
166 self._x[II] = data[:, self._x_I]
167 else:
168 self._x[II] = self._x_map(data[:, self._x_I])
169 if self._v_I is not None:
170 self._v[II] = data[:, self._v_I]
172 def __iter__(self):
173 return self
175 def close(self):
176 if not self._fh.closed:
177 self._fh.close()
179 def __next__(self):
180 if not self._open:
181 raise StopIteration
183 if self._first_called:
184 self._get_next()
185 else:
186 self._get_first()
188 if self._v_I is not None:
189 frame = ReaderFrame(frame_index=next(self._frame_index),
190 n_atoms=int(self._n_atoms),
191 cell=self.x_factor * self._cell.copy('F'),
192 positions=self.x_factor * self._x,
193 velocities=self.v_factor * self._v
194 )
195 else:
196 frame = ReaderFrame(frame_index=next(self._frame_index),
197 n_atoms=int(self._n_atoms),
198 cell=self.x_factor * self._cell.copy('F'),
199 positions=self.x_factor * self._x
200 )
202 return frame