Coverage for dynasor / trajectory / extxyz_trajectory_reader.py: 90%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 06:28 +0000

1import concurrent.futures 

2import numpy as np 

3import io 

4from itertools import count 

5from typing import Optional 

6from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

7from dynasor.trajectory.trajectory_frame import ReaderFrame 

8from ase.io import read 

9from ase import Atoms 

10 

11 

12def frame_text_to_atoms(frame_text): 

13 return read(io.StringIO(frame_text), format='extxyz') 

14 

15 

16def iter_frame_texts(f): 

17 while True: 

18 natoms_line = f.readline() 

19 if not natoms_line: 

20 break 

21 

22 comment_line = f.readline() 

23 if not comment_line: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true

24 raise ValueError('Unexpected EOF while reading extxyz comment line') 

25 

26 try: 

27 natoms = int(natoms_line.strip()) 

28 except ValueError: 

29 raise ValueError(f'Invalid extxyz atom count line: {natoms_line!r}') 

30 

31 atom_lines = [] 

32 for _ in range(natoms): 

33 line = f.readline() 

34 if not line: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 raise ValueError('Unexpected EOF while reading extxyz atom lines') 

36 atom_lines.append(line) 

37 

38 yield natoms_line + comment_line + ''.join(atom_lines) 

39 

40 

41def iread(f, max_workers: Optional[int] = None) -> Atoms: 

42 frame_iterator = iter_frame_texts(f) 

43 with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ex: 

44 buff = [] 

45 n_submit = ex._max_workers 

46 

47 for _ in range(n_submit): 

48 try: 

49 frame_text = next(frame_iterator) 

50 buff.append(ex.submit(frame_text_to_atoms, frame_text)) 

51 except StopIteration: 

52 break 

53 

54 while buff: 

55 res = buff.pop(0) 

56 

57 try: 

58 frame_text = next(frame_iterator) 

59 buff.append(ex.submit(frame_text_to_atoms, frame_text)) 

60 except StopIteration: 

61 pass 

62 

63 yield res.result() 

64 

65 

66class ExtxyzTrajectoryReader(AbstractTrajectoryReader): 

67 """Read extend xyz trajectory file, typically produced by GPUMD 

68 

69 This is a naive (and comparatively slow) parallel implementation which 

70 relies on the ASE xyz reader. 

71 

72 Parameters 

73 ---------- 

74 filename 

75 Name of input file. 

76 length_unit 

77 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

78 time_unit 

79 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

80 max_workers 

81 Number of working processes; defaults to ``None``, which means that the number of 

82 processors on the machine is used. 

83 """ 

84 

85 def __init__(self, 

86 filename: str, 

87 length_unit: Optional[str] = None, 

88 time_unit: Optional[str] = None, 

89 max_workers: Optional[int] = None): 

90 

91 # setup generator object 

92 self._fobj = open(filename, 'r') 

93 self._generator_xyz = iread(self._fobj, max_workers=max_workers) 

94 self._open = True 

95 self._frame_index = count(0) 

96 

97 # set up units 

98 self.set_unit_scaling_factors(length_unit, time_unit) 

99 

100 def _get_next(self): 

101 try: 

102 atoms = next(self._generator_xyz) 

103 except StopIteration: 

104 self._fobj.close() 

105 self._open = False 

106 raise 

107 except Exception: 

108 self._fobj.close() 

109 self._open = False 

110 raise 

111 

112 self._atom_types = np.array(list(atoms.symbols)) 

113 self._n_atoms = len(atoms) 

114 self._cell = atoms.cell[:] 

115 self._x = atoms.positions 

116 if 'vel' in atoms.arrays: 

117 self._v = atoms.arrays['vel'] 

118 else: 

119 self._v = None 

120 

121 def __iter__(self): 

122 return self 

123 

124 def close(self): 

125 if not self._fobj.closed: 

126 self._fobj.close() 

127 self._open = False 

128 

129 def __next__(self): 

130 if not self._open: 

131 raise StopIteration 

132 

133 self._get_next() 

134 

135 if self._v is not None: 

136 frame = ReaderFrame(frame_index=next(self._frame_index), 

137 n_atoms=int(self._n_atoms), 

138 cell=self.x_factor * self._cell.copy('F'), 

139 positions=self.x_factor * self._x, 

140 velocities=self.v_factor * self._v, 

141 atom_types=self._atom_types 

142 ) 

143 else: 

144 frame = ReaderFrame(frame_index=next(self._frame_index), 

145 n_atoms=int(self._n_atoms), 

146 cell=self.x_factor * self._cell.copy('F'), 

147 positions=self.x_factor * self._x, 

148 atom_types=self._atom_types 

149 ) 

150 return frame