Coverage for dynasor / trajectory / extxyz_trajectory_reader.py: 90%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 06:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 06:28 +0000
1import concurrent.futures
2import numpy as np
3import io
4from itertools import count
5from typing import Optional
6from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
7from dynasor.trajectory.trajectory_frame import ReaderFrame
8from ase.io import read
9from ase import Atoms
12def frame_text_to_atoms(frame_text):
13 return read(io.StringIO(frame_text), format='extxyz')
16def iter_frame_texts(f):
17 while True:
18 natoms_line = f.readline()
19 if not natoms_line:
20 break
22 comment_line = f.readline()
23 if not comment_line: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true
24 raise ValueError('Unexpected EOF while reading extxyz comment line')
26 try:
27 natoms = int(natoms_line.strip())
28 except ValueError:
29 raise ValueError(f'Invalid extxyz atom count line: {natoms_line!r}')
31 atom_lines = []
32 for _ in range(natoms):
33 line = f.readline()
34 if not line: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true
35 raise ValueError('Unexpected EOF while reading extxyz atom lines')
36 atom_lines.append(line)
38 yield natoms_line + comment_line + ''.join(atom_lines)
41def iread(f, max_workers: Optional[int] = None) -> Atoms:
42 frame_iterator = iter_frame_texts(f)
43 with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ex:
44 buff = []
45 n_submit = ex._max_workers
47 for _ in range(n_submit):
48 try:
49 frame_text = next(frame_iterator)
50 buff.append(ex.submit(frame_text_to_atoms, frame_text))
51 except StopIteration:
52 break
54 while buff:
55 res = buff.pop(0)
57 try:
58 frame_text = next(frame_iterator)
59 buff.append(ex.submit(frame_text_to_atoms, frame_text))
60 except StopIteration:
61 pass
63 yield res.result()
66class ExtxyzTrajectoryReader(AbstractTrajectoryReader):
67 """Read extend xyz trajectory file, typically produced by GPUMD
69 This is a naive (and comparatively slow) parallel implementation which
70 relies on the ASE xyz reader.
72 Parameters
73 ----------
74 filename
75 Name of input file.
76 length_unit
77 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
78 time_unit
79 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
80 max_workers
81 Number of working processes; defaults to ``None``, which means that the number of
82 processors on the machine is used.
83 """
85 def __init__(self,
86 filename: str,
87 length_unit: Optional[str] = None,
88 time_unit: Optional[str] = None,
89 max_workers: Optional[int] = None):
91 # setup generator object
92 self._fobj = open(filename, 'r')
93 self._generator_xyz = iread(self._fobj, max_workers=max_workers)
94 self._open = True
95 self._frame_index = count(0)
97 # set up units
98 self.set_unit_scaling_factors(length_unit, time_unit)
100 def _get_next(self):
101 try:
102 atoms = next(self._generator_xyz)
103 except StopIteration:
104 self._fobj.close()
105 self._open = False
106 raise
107 except Exception:
108 self._fobj.close()
109 self._open = False
110 raise
112 self._atom_types = np.array(list(atoms.symbols))
113 self._n_atoms = len(atoms)
114 self._cell = atoms.cell[:]
115 self._x = atoms.positions
116 if 'vel' in atoms.arrays:
117 self._v = atoms.arrays['vel']
118 else:
119 self._v = None
121 def __iter__(self):
122 return self
124 def close(self):
125 if not self._fobj.closed:
126 self._fobj.close()
127 self._open = False
129 def __next__(self):
130 if not self._open:
131 raise StopIteration
133 self._get_next()
135 if self._v is not None:
136 frame = ReaderFrame(frame_index=next(self._frame_index),
137 n_atoms=int(self._n_atoms),
138 cell=self.x_factor * self._cell.copy('F'),
139 positions=self.x_factor * self._x,
140 velocities=self.v_factor * self._v,
141 atom_types=self._atom_types
142 )
143 else:
144 frame = ReaderFrame(frame_index=next(self._frame_index),
145 n_atoms=int(self._n_atoms),
146 cell=self.x_factor * self._cell.copy('F'),
147 positions=self.x_factor * self._x,
148 atom_types=self._atom_types
149 )
150 return frame