Coverage for dynasor / trajectory / extxyz_trajectory_reader.py: 98%
71 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
1import concurrent.futures
2from itertools import count
3from typing import Optional
4from ase import Atoms
5from ase.io.extxyz import ixyzchunks
6from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader
7from dynasor.trajectory.trajectory_frame import ReaderFrame
8import numpy as np
11def chunk_to_atoms(chunk):
12 atoms = chunk.build()
13 return atoms
16def iread(f, max_workers: Optional[int] = None) -> Atoms:
17 """Reads extxyz in parallel using multiple processese.
19 Parameters
20 ----------
21 f
22 File object.
23 max_workers
24 Maximum number of processes.
25 """
27 # chunks are simple objects
28 chunk_iterator = iter(ixyzchunks(f))
30 with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ex:
32 buff = []
33 for i in range(ex._max_workers):
34 try:
35 chunk = next(chunk_iterator)
36 buff.append(ex.submit(chunk_to_atoms, chunk))
37 except RuntimeError:
38 pass
39 except StopIteration:
40 pass
42 while True:
43 if len(buff) == 0:
44 break
46 res = buff.pop(0)
48 try:
49 chunk = next(chunk_iterator)
50 buff.append(ex.submit(chunk_to_atoms, chunk))
51 except RuntimeError:
52 pass
53 except StopIteration:
54 pass
56 atoms = res.result()
57 yield atoms
60class ExtxyzTrajectoryReader(AbstractTrajectoryReader):
61 """Read extend xyz trajectory file, typically produced by GPUMD
63 This is a naive (and comparatively slow) parallel implementation which
64 relies on the ASE xyz reader.
66 Parameters
67 ----------
68 filename
69 Name of input file.
70 length_unit
71 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``).
72 time_unit
73 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``).
74 max_workers
75 Number of working processes; defaults to ``None``, which means that the number of
76 processors on the machine is used.
77 """
79 def __init__(self,
80 filename: str,
81 length_unit: Optional[str] = None,
82 time_unit: Optional[str] = None,
83 max_workers: Optional[int] = None):
85 # setup generator object
86 self._fobj = open(filename, 'r')
87 self._generator_xyz = iread(self._fobj, max_workers=max_workers)
88 self._open = True
89 self._frame_index = count(0)
91 # set up units
92 self.set_unit_scaling_factors(length_unit, time_unit)
94 def _get_next(self):
95 try:
96 atoms = next(self._generator_xyz)
97 except Exception:
98 self._fobj.close()
99 self._open = False
100 raise StopIteration
102 self._atom_types = np.array(list(atoms.symbols))
103 self._n_atoms = len(atoms)
104 self._cell = atoms.cell[:]
105 self._x = atoms.positions
106 if 'vel' in atoms.arrays:
107 self._v = atoms.arrays['vel']
108 else:
109 self._v = None
111 def __iter__(self):
112 return self
114 def close(self):
115 if not self._fobj.closed:
116 self._fobj.close()
117 self._open = False
119 def __next__(self):
120 if not self._open:
121 raise StopIteration
123 self._get_next()
125 if self._v is not None:
126 frame = ReaderFrame(frame_index=next(self._frame_index),
127 n_atoms=int(self._n_atoms),
128 cell=self.x_factor * self._cell.copy('F'),
129 positions=self.x_factor * self._x,
130 velocities=self.v_factor * self._v,
131 atom_types=self._atom_types
132 )
133 else:
134 frame = ReaderFrame(frame_index=next(self._frame_index),
135 n_atoms=int(self._n_atoms),
136 cell=self.x_factor * self._cell.copy('F'),
137 positions=self.x_factor * self._x,
138 atom_types=self._atom_types
139 )
140 return frame