Coverage for dynasor / trajectory / extxyz_trajectory_reader.py: 98%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 12:31 +0000

1import concurrent.futures 

2from itertools import count 

3from typing import Optional 

4from ase import Atoms 

5from ase.io.extxyz import ixyzchunks 

6from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

7from dynasor.trajectory.trajectory_frame import ReaderFrame 

8import numpy as np 

9 

10 

11def chunk_to_atoms(chunk): 

12 atoms = chunk.build() 

13 return atoms 

14 

15 

16def iread(f, max_workers: Optional[int] = None) -> Atoms: 

17 """Reads extxyz in parallel using multiple processese. 

18 

19 Parameters 

20 ---------- 

21 f 

22 File object. 

23 max_workers 

24 Maximum number of processes. 

25 """ 

26 

27 # chunks are simple objects 

28 chunk_iterator = iter(ixyzchunks(f)) 

29 

30 with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ex: 

31 

32 buff = [] 

33 for i in range(ex._max_workers): 

34 try: 

35 chunk = next(chunk_iterator) 

36 buff.append(ex.submit(chunk_to_atoms, chunk)) 

37 except RuntimeError: 

38 pass 

39 except StopIteration: 

40 pass 

41 

42 while True: 

43 if len(buff) == 0: 

44 break 

45 

46 res = buff.pop(0) 

47 

48 try: 

49 chunk = next(chunk_iterator) 

50 buff.append(ex.submit(chunk_to_atoms, chunk)) 

51 except RuntimeError: 

52 pass 

53 except StopIteration: 

54 pass 

55 

56 atoms = res.result() 

57 yield atoms 

58 

59 

60class ExtxyzTrajectoryReader(AbstractTrajectoryReader): 

61 """Read extend xyz trajectory file, typically produced by GPUMD 

62 

63 This is a naive (and comparatively slow) parallel implementation which 

64 relies on the ASE xyz reader. 

65 

66 Parameters 

67 ---------- 

68 filename 

69 Name of input file. 

70 length_unit 

71 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

72 time_unit 

73 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

74 max_workers 

75 Number of working processes; defaults to ``None``, which means that the number of 

76 processors on the machine is used. 

77 """ 

78 

79 def __init__(self, 

80 filename: str, 

81 length_unit: Optional[str] = None, 

82 time_unit: Optional[str] = None, 

83 max_workers: Optional[int] = None): 

84 

85 # setup generator object 

86 self._fobj = open(filename, 'r') 

87 self._generator_xyz = iread(self._fobj, max_workers=max_workers) 

88 self._open = True 

89 self._frame_index = count(0) 

90 

91 # set up units 

92 self.set_unit_scaling_factors(length_unit, time_unit) 

93 

94 def _get_next(self): 

95 try: 

96 atoms = next(self._generator_xyz) 

97 except Exception: 

98 self._fobj.close() 

99 self._open = False 

100 raise StopIteration 

101 

102 self._atom_types = np.array(list(atoms.symbols)) 

103 self._n_atoms = len(atoms) 

104 self._cell = atoms.cell[:] 

105 self._x = atoms.positions 

106 if 'vel' in atoms.arrays: 

107 self._v = atoms.arrays['vel'] 

108 else: 

109 self._v = None 

110 

111 def __iter__(self): 

112 return self 

113 

114 def close(self): 

115 if not self._fobj.closed: 

116 self._fobj.close() 

117 self._open = False 

118 

119 def __next__(self): 

120 if not self._open: 

121 raise StopIteration 

122 

123 self._get_next() 

124 

125 if self._v is not None: 

126 frame = ReaderFrame(frame_index=next(self._frame_index), 

127 n_atoms=int(self._n_atoms), 

128 cell=self.x_factor * self._cell.copy('F'), 

129 positions=self.x_factor * self._x, 

130 velocities=self.v_factor * self._v, 

131 atom_types=self._atom_types 

132 ) 

133 else: 

134 frame = ReaderFrame(frame_index=next(self._frame_index), 

135 n_atoms=int(self._n_atoms), 

136 cell=self.x_factor * self._cell.copy('F'), 

137 positions=self.x_factor * self._x, 

138 atom_types=self._atom_types 

139 ) 

140 return frame