Coverage for local_installation/dynasor/trajectory/extxyz_trajectory_reader.py: 91%

74 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-06-13 16:00 +0000

1import concurrent.futures 

2from ase.io.extxyz import ixyzchunks 

3from dynasor.trajectory.abstract_trajectory_reader import AbstractTrajectoryReader 

4from dynasor.trajectory.trajectory_frame import ReaderFrame 

5from itertools import count 

6import numpy as np 

7 

8 

9def chunk_to_atoms(chunk): 

10 atoms = chunk.build() 

11 return atoms 

12 

13 

14def iread(f, max_workers=None): 

15 """Reads extxyz in parallel using multiprocess.""" 

16 

17 # chunks are simple objects 

18 chunk_iterator = iter(ixyzchunks(f)) 

19 

20 with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as ex: 

21 

22 buff = [] 

23 for i in range(ex._max_workers): 

24 try: 

25 chunk = next(chunk_iterator) 

26 buff.append(ex.submit(chunk_to_atoms, chunk)) 

27 except RuntimeError: 

28 pass 

29 except StopIteration: 

30 pass 

31 

32 while True: 

33 if len(buff) == 0: 

34 break 

35 

36 res = buff.pop(0) 

37 

38 try: 

39 chunk = next(chunk_iterator) 

40 buff.append(ex.submit(chunk_to_atoms, chunk)) 

41 except RuntimeError: 

42 pass 

43 except StopIteration: 

44 pass 

45 

46 atoms = res.result() 

47 yield atoms 

48 

49 

50class ExtxyzTrajectoryReader(AbstractTrajectoryReader): 

51 """Read extend xyz trajectory file, typically produced by GPUMD 

52 

53 This is a naive (and comparatively slow) parallel implementation which 

54 relies on the ASE xyz reader. 

55 

56 Parameters 

57 ---------- 

58 filename 

59 Name of input file. 

60 length_unit 

61 Unit of length for the input trajectory (``'Angstrom'``, ``'nm'``, ``'pm'``, ``'fm'``). 

62 time_unit 

63 Unit of time for the input trajectory (``'fs'``, ``'ps'``, ``'ns'``). 

64 max_workers 

65 Number of working processes; defaults to ``None``, which means that the number of 

66 processors on the machine is used. 

67 """ 

68 

69 def __init__(self, 

70 filename: str, 

71 length_unit: str = 'Angstrom', 

72 time_unit: str = 'fs', 

73 max_workers: int = None): 

74 

75 # setup generator object 

76 self._fobj = open(filename, 'r') 

77 self._generator_xyz = iread(self._fobj, max_workers=max_workers) 

78 self._open = True 

79 self._frame_index = count(0) 

80 

81 # setup units 

82 if length_unit not in self.lengthunits_to_nm_table: 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true

83 raise ValueError(f'Specified length unit {length_unit} is not an available option.') 

84 else: 

85 self.x_factor = self.lengthunits_to_nm_table[length_unit] 

86 if time_unit not in self.timeunits_to_fs_table: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true

87 raise ValueError(f'Specified time unit {time_unit} is not an available option.') 

88 else: 

89 self.t_factor = self.timeunits_to_fs_table[time_unit] 

90 self.v_factor = self.x_factor / self.t_factor 

91 

92 def _get_next(self): 

93 try: 

94 atoms = next(self._generator_xyz) 

95 except Exception: 

96 self._fobj.close() 

97 self._open = False 

98 raise StopIteration 

99 

100 self._atom_types = np.array(list(atoms.symbols)) 

101 self._n_atoms = len(atoms) 

102 self._cell = atoms.cell[:] 

103 self._x = atoms.positions 

104 if 'vel' in atoms.arrays: 

105 self._v = atoms.arrays['vel'] 

106 else: 

107 self._v = None 

108 

109 def __iter__(self): 

110 return self 

111 

112 def close(self): 

113 if not self._fobj.closed: 

114 self._fobj.close() 

115 self._open = False 

116 

117 def __next__(self): 

118 if not self._open: 118 ↛ 119line 118 didn't jump to line 119, because the condition on line 118 was never true

119 raise StopIteration 

120 

121 self._get_next() 

122 

123 if self._v is not None: 

124 frame = ReaderFrame(frame_index=next(self._frame_index), 

125 n_atoms=int(self._n_atoms), 

126 cell=self.x_factor * self._cell.copy('F'), 

127 positions=self.x_factor * self._x, 

128 velocities=self.v_factor * self._v, 

129 atom_types=self._atom_types 

130 ) 

131 else: 

132 frame = ReaderFrame(frame_index=next(self._frame_index), 

133 n_atoms=int(self._n_atoms), 

134 cell=self.x_factor * self._cell.copy('F'), 

135 positions=self.x_factor * self._x, 

136 atom_types=self._atom_types 

137 ) 

138 return frame