Coverage for local_installation/dynasor/trajectory/lammps_trajectory_reader.py: 77%
123 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-12-21 12:02 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-12-21 12:02 +0000
1import numpy as np
2import re
4from collections import deque
5from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
6from dynasor.trajectory.trajectory_frame import ReaderFrame
7from itertools import count
8from numpy import array, arange, zeros
11class LammpsTrajectoryReader(AbstractTrajectoryReader):
12 """Read LAMMPS trajectory file
14 This is a naive (and comparatively slow) implementation,
15 written entirely in python.
17 Parameters
18 ----------
19 filename
20 Name of input file.
21 length_unit
22 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
23 time_unit
24 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
25 """
27 def __init__(
28 self,
29 filename: str,
30 length_unit: str = 'Angstrom',
31 time_unit: str = 'fs'
32 ):
34 if filename.endswith('.gz'): 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true
35 import gzip
36 self._fh = gzip.open(filename, 'rt')
37 elif filename.endswith('.bz2'): 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true
38 import bz2
39 self._fh = bz2.open(filename, 'rt')
40 else:
41 self._fh = open(filename, 'r')
43 self._open = True
44 regexp = r'^ITEM: (TIMESTEP|NUMBER OF ATOMS|BOX BOUNDS|ATOMS) ?(.*)$'
45 self._item_re = re.compile(regexp)
47 self._first_called = False
48 self._frame_index = count(0)
50 # setup units
51 if length_unit not in self.lengthunits_to_nm_table: 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true
52 raise ValueError(f'Specified length unit {length_unit} is not an available option.')
53 else:
54 self.x_factor = self.lengthunits_to_nm_table[length_unit]
55 if time_unit not in self.timeunits_to_fs_table: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true
56 raise ValueError(f'Specified time unit {time_unit} is not an available option.')
57 else:
58 self.t_factor = self.timeunits_to_fs_table[time_unit]
59 self.v_factor = self.x_factor / self.t_factor
61 # ITEM: TIMESTEP
62 # 81000
63 # ITEM: NUMBER OF ATOMS
64 # 1536
65 # ITEM: BOX BOUNDS pp pp pp
66 # 1.54223 26.5378
67 # 1.54223 26.5378
68 # 1.54223 26.5378
69 # ITEM: ATOMS id type x y z vx vy vz
70 # 247 1 3.69544 2.56202 3.27701 0.00433856 -0.00099307 -0.00486166
71 # 249 2 3.73324 3.05962 4.14359 0.00346029 0.00332502 -0.00731005
72 # 463 1 3.5465 4.12841 5.34888 0.000523332 0.00145597 -0.00418675
74 def _read_frame_header(self):
75 while True:
76 L = self._fh.readline()
77 m = self._item_re.match(L)
78 if not m:
79 if L == '': 79 ↛ 83line 79 didn't jump to line 83, because the condition on line 79 was never false
80 self._fh.close()
81 self._open = False
82 raise StopIteration
83 if L.strip() == '':
84 continue
85 raise IOError('TRJ_reader: Failed to parse TRJ frame header')
86 if m.group(1) == 'TIMESTEP':
87 step = int(self._fh.readline())
88 elif m.group(1) == 'NUMBER OF ATOMS':
89 n_atoms = int(self._fh.readline())
90 elif m.group(1) == 'BOX BOUNDS':
91 bbounds = [deque(map(float, self._fh.readline().split()))
92 for _ in range(3)]
93 x = array(bbounds)
94 cell = np.diag(x[:, 1] - x[:, 0])
95 if x.shape == (3, 3): 95 ↛ 96line 95 didn't jump to line 96, because the condition on line 95 was never true
96 cell[1, 0] = x[0, 2]
97 cell[2, 0] = x[1, 2]
98 cell[2, 1] = x[2, 2]
99 elif x.shape != (3, 2): 99 ↛ 100line 99 didn't jump to line 100, because the condition on line 99 was never true
100 raise IOError('TRJ_reader: Malformed cell bounds')
101 elif m.group(1) == 'ATOMS': 101 ↛ 76line 101 didn't jump to line 76, because the condition on line 101 was never false
102 cols = tuple(m.group(2).split())
103 # At this point, there should be only atomic data left
104 return (step, n_atoms, cell, cols)
106 def _get_first(self):
107 # Read first frame, update state of self, create indeces etc
108 step, N, cell, cols = self._read_frame_header()
109 self._n_atoms = N
110 self._step = step
111 self._cols = cols
112 self._cell = cell
114 def _all_in_cols(keys):
115 for k in keys:
116 if k not in cols:
117 return False
118 return True
120 self._x_map = None
121 if _all_in_cols(('id', 'xu', 'yu', 'zu')): 121 ↛ 122line 121 didn't jump to line 122, because the condition on line 121 was never true
122 self._x_I = array(deque(map(cols.index, ('xu', 'yu', 'zu'))))
123 elif _all_in_cols(('id', 'x', 'y', 'z')): 123 ↛ 125line 123 didn't jump to line 125, because the condition on line 123 was never false
124 self._x_I = array(deque(map(cols.index, ('x', 'y', 'z'))))
125 elif _all_in_cols(('id', 'xs', 'ys', 'zs')):
126 self._x_I = array(deque(map(cols.index, ('xs', 'ys', 'zs'))))
127 _x_factor = self._cell.diagonal()
128 # xs.shape == (n, 3)
129 self._x_map = lambda xs: xs * _x_factor
130 else:
131 raise RuntimeError('TRJ file must contain at least atom-id, x, y,'
132 ' and z coordinates to be useful.')
133 self._id_I = cols.index('id')
135 if _all_in_cols(('vx', 'vy', 'vz')):
136 self._v_I = array(deque(map(cols.index, ('vx', 'vy', 'vz'))))
137 else:
138 self._v_I = None
140 if 'type' in cols: 140 ↛ 143line 140 didn't jump to line 143, because the condition on line 140 was never false
141 self._type_I = cols.index('type')
142 else:
143 self._type_I = None
145 data = array([list(map(float, self._fh.readline().split())) for _ in range(N)])
146 # data.shape == (N, Ncols)
147 II = np.asarray(data[:, self._id_I], dtype=np.int_)
148 # Unless dump is done for group 'all' ...
149 II[np.argsort(II)] = arange(len(II))
150 self._x = zeros((N, 3))
151 if self._x_map is None: 151 ↛ 154line 151 didn't jump to line 154, because the condition on line 151 was never false
152 self._x[II] = data[:, self._x_I]
153 else:
154 self._x[II] = self._x_map(data[:, self._x_I])
155 if self._v_I is not None:
156 self._v = zeros((N, 3))
157 self._v[II] = data[:, self._v_I]
159 self._first_called = True
161 def _get_next(self):
162 # get next frame, update state of self
163 step, N, cell, cols = self._read_frame_header()
164 assert self._n_atoms == N
165 assert self._cols == cols
166 self._step = step
167 self._cell = cell
169 data = array([deque(map(float, self._fh.readline().split()))
170 for _ in range(N)])
171 II = np.asarray(data[:, self._id_I], dtype=np.int_) - 1
172 if self._x_map is None: 172 ↛ 175line 172 didn't jump to line 175, because the condition on line 172 was never false
173 self._x[II] = data[:, self._x_I]
174 else:
175 self._x[II] = self._x_map(data[:, self._x_I])
176 if self._v_I is not None:
177 self._v[II] = data[:, self._v_I]
179 def __iter__(self):
180 return self
182 def close(self):
183 if not self._fh.closed:
184 self._fh.close()
186 def __next__(self):
187 if not self._open: 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true
188 raise StopIteration
190 if self._first_called:
191 self._get_next()
192 else:
193 self._get_first()
195 if self._v_I is not None:
196 frame = ReaderFrame(frame_index=next(self._frame_index),
197 n_atoms=int(self._n_atoms),
198 cell=self.x_factor * self._cell.copy('F'),
199 positions=self.x_factor * self._x,
200 velocities=self.v_factor * self._v
201 )
202 else:
203 frame = ReaderFrame(frame_index=next(self._frame_index),
204 n_atoms=int(self._n_atoms),
205 cell=self.x_factor * self._cell.copy('F'),
206 positions=self.x_factor * self._x
207 )
209 return frame