Coverage for dynasor / sample.py: 88%

171 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 12:31 +0000

1import getpass 

2import socket 

3from copy import deepcopy 

4from datetime import datetime 

5from typing import Any, Optional 

6import numpy as np 

7from dynasor.logging_tools import logger 

8from numpy.typing import NDArray 

9from pandas import DataFrame 

10 

11 

12class Sample: 

13 """ 

14 Class for holding correlation functions and additional metadata. 

15 Objects of this class are most commonly generated by calling functions such 

16 as :func:`compute_static_structure_factors` or 

17 :func:`compute_dynamic_structure_factors`. 

18 They can then be written to and subsequently read from file. 

19 

20 You can see which correlation functions are available via the 

21 :attr:`available_correlation_functions` property. 

22 You can then access the correlation functions either by key or as property. 

23 For example, you could access the static structure factor in the following ways:: 

24 

25 sample.Sq # as property 

26 sample['Sq'] # via key 

27 

28 The correlation functions are provided as numpy arrays. 

29 

30 There are several additional fields, the availability of which depends on the 

31 type of correlation function that was sampled. Static samples, for example, do 

32 not contain the `time` and `omega` fields. `q_norms` is only available if spherical 

33 averaging was carried out, typically via :func:`get_spherically_averaged_sample_smearing`. 

34 

35 * `q_points`: list of q-point coordinates 

36 * `q_norms`: norms of the momentum vector 

37 * `time`: time 

38 * `omega`: frequency 

39 

40 You can also see which fields are available by "printing" the :class:`Sample` object. 

41 

42 Parameters 

43 ---------- 

44 data_dict 

45 Dictionary with correlation functions. 

46 simulation_data 

47 Dictionary with simulation data. The following fields are strongly encouraged 

48 (but not enforced): `atom_types`, `cell`, `particle_counts`. 

49 history 

50 Previous history of operations on :class:`Sample` object. 

51 """ 

52 

53 def __init__( 

54 self, 

55 data_dict: dict[str, Any], 

56 simulation_data: dict[str, Any], 

57 history: Optional[list[dict[str, Any]]] = None, 

58 ): 

59 # set data dict as attributes 

60 self._data_keys = list(data_dict) 

61 for key in data_dict: 

62 setattr(self, key, data_dict[key]) 

63 

64 # set metadata 

65 # (using deepcopy here to avoid accidental transfer by reference) 

66 self._metadata = dict() 

67 self._metadata['simulation_data'] = deepcopy(simulation_data) 

68 if history is not None: 

69 logger.debug('Copying history') 

70 self._metadata['history'] = deepcopy(history) 

71 else: 

72 self._metadata['history'] = [] 

73 

74 def _append_history( 

75 self, 

76 calling_function: str, 

77 caller_metadata: Optional[dict[str, Any]] = None, 

78 ): 

79 """Add record to history. 

80 

81 Parameters 

82 ---------- 

83 calling_function 

84 Name of calling function. 

85 caller_metadata 

86 Metadata associated with the calling function. 

87 """ 

88 from dynasor import __version__ as dynasor_version 

89 

90 new_record = dict(func=calling_function) 

91 if caller_metadata is not None: 

92 new_record.update(caller_metadata.copy()) 

93 new_record.update(dict( 

94 date_time=datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), 

95 username=getpass.getuser(), 

96 hostname=socket.gethostname(), 

97 dynasor_version=dynasor_version, 

98 )) 

99 self._metadata['history'].append(new_record) 

100 

101 def __getitem__(self, key): 

102 """ Makes it possible to get the attributes using Sample['key'] """ 

103 try: 

104 return getattr(self, key) 

105 except AttributeError: 

106 raise KeyError(key) 

107 

108 def write_to_npz(self, fname: str): 

109 """ Write object to file in numpy npz format. 

110 

111 Parameters 

112 ---------- 

113 fname 

114 Name of the file in which to store the Sample object. 

115 """ 

116 data_to_save = dict(name=self.__class__.__name__) 

117 data_to_save['metadata'] = self._metadata 

118 data_dict = dict() 

119 for key in self._data_keys: 

120 data_dict[key] = getattr(self, key) 

121 data_to_save['data_dict'] = data_dict 

122 np.savez_compressed(fname, **data_to_save) 

123 

124 @property 

125 def available_correlation_functions(self) -> list[str]: 

126 """ All the available correlation functions in sample. """ 

127 keys_to_skip = set(['q_points', 'q_norms', 'time', 'omega']) 

128 return sorted(list(set(self._data_keys) - keys_to_skip)) 

129 

130 @property 

131 def dimensions(self) -> list[str]: 

132 r"""The dimensions for the samples, e.g., for :math:`S(q, \omega)` 

133 the dimensions would be the :math:`q` and :math:`\omega` axes. 

134 """ 

135 keys_to_skip = set(self.available_correlation_functions) 

136 return sorted(list(set(self._data_keys) - keys_to_skip)) 

137 

138 @property 

139 def metadata(self) -> dict[str, Any]: 

140 """ Metadata. """ 

141 return deepcopy(self._metadata) 

142 

143 @property 

144 def simulation_data(self) -> dict[str, Any]: 

145 """ Simulation data. """ 

146 return deepcopy(self._metadata['simulation_data']) 

147 

148 @property 

149 def history(self) -> list[dict[str, Any]]: 

150 """ List of operations applied to this :class:`Sample` object. """ 

151 return deepcopy(self._metadata['history']) 

152 

153 @property 

154 def atom_types(self) -> list[str]: 

155 """ Simulation data: Atom types. """ 

156 return self.simulation_data['atom_types'].copy() \ 

157 if 'atom_types' in self.simulation_data else None 

158 

159 @property 

160 def particle_counts(self) -> dict[str, int]: 

161 """ Simulation data: Number of particles per type. """ 

162 return self.simulation_data['particle_counts'].copy() \ 

163 if 'particle_counts' in self.simulation_data else None 

164 

165 @property 

166 def pairs(self) -> list[tuple[str, str]]: 

167 """ Pairs of types for which correlation functions are available. """ 

168 return self.simulation_data['pairs'].copy() \ 

169 if 'pairs' in self.simulation_data else None 

170 

171 @property 

172 def cell(self) -> NDArray[float]: 

173 """ Simulation data: Cell metric. """ 

174 return self.simulation_data['cell'].copy() \ 

175 if 'cell' in self.simulation_data else None 

176 

177 @property 

178 def has_incoherent(self) -> bool: 

179 """ Whether this sample contains the incoherent correlation functions or not. """ 

180 return False 

181 

182 @property 

183 def has_currents(self) -> bool: 

184 """ Whether this sample contains the current correlation functions or not. """ 

185 return False 

186 

187 def __repr__(self) -> str: 

188 return str(self) 

189 

190 _special_fields = dict( 

191 atom_types='Atom types', 

192 cell='Cell', 

193 number_of_frames='Number of frames', 

194 particle_counts='Particle counts', 

195 maximum_time_lag='Maximum time lag', 

196 angular_frequency_resolution='Angular frequency resolution', 

197 time_between_frames='Time between frames', 

198 ) 

199 

200 def __str__(self) -> str: 

201 s_contents = [self.__class__.__name__] 

202 for key, value in sorted(self.simulation_data.items()): 

203 s_contents.append(f'{self._special_fields.get(key, key)}: {value}') 

204 for key in self.dimensions: 

205 s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}' 

206 s_contents.append(s_i) 

207 for key in self.available_correlation_functions: 

208 s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}' 

209 s_contents.append(s_i) 

210 s = '\n'.join(s_contents) 

211 return s 

212 

213 def _repr_html_(self) -> str: 

214 s = [f'<h3>{self.__class__.__name__}</h3>'] 

215 

216 s += ['<h4>Simulation</h4>'] 

217 s += ['<table border="1" class="dataframe">'] 

218 s += ['<tbody>'] 

219 s += ['<thead><tr>' 

220 '<th style="text-align: left">Name</th>' 

221 '<th>Content</th>' 

222 '</tr></thead>'] 

223 s += ['<tbody>'] 

224 for key, value in sorted(self.simulation_data.items()): 

225 if key not in self._special_fields: 

226 continue 

227 s += [f'<tr><td style="text-align: left;">{self._special_fields[key]}</td>' 

228 f'<td>{value}</td></tr>'] 

229 s += ['</tbody>'] 

230 s += ['</table>'] 

231 

232 s += ['<h4>Dimensions</h4>'] 

233 s += ['<table border="1" class="dataframe">'] 

234 s += ['<tbody>'] 

235 s += ['<thead><tr>' 

236 '<th style="text-align: left">Field</th>' 

237 '<th>Size</th>' 

238 '</tr></thead>'] 

239 s += ['<tbody>'] 

240 for key in self.dimensions: 

241 s += [f'<tr><td style="text-align: left">{key}</td>' 

242 f'<td>{np.shape(getattr(self, key))}</td></tr>'] 

243 s += ['</tbody>'] 

244 s += ['</table>'] 

245 

246 s += ['<h4>History</h4>'] 

247 s += ['<table border="1" class="dataframe">'] 

248 s += ['<tbody>'] 

249 s += ['<thead><tr>' 

250 '<th style="text-align: left">Function</th>' 

251 '<th style="text-align: left">Field</th>' 

252 '<th style="text-align: left">Content</th>' 

253 '</tr></thead>'] 

254 s += ['<tbody>'] 

255 for entry in self.history: 255 ↛ 256line 255 didn't jump to line 256 because the loop on line 255 never started

256 title = entry.get('func', '') 

257 for key, value in entry.items(): 

258 if key == 'func': 

259 continue 

260 s += [f'<tr><td style="text-align: left">{title}</td>' 

261 f'<td style="text-align: left">{key}</td>' 

262 f'<td>{value}</td></tr>'] 

263 title = '' 

264 s += ['</tbody>'] 

265 s += ['</table>'] 

266 return '\n'.join(s) 

267 

268 

269class StaticSample(Sample): 

270 """ 

271 Class for holding static correlation functions and additional metadata. 

272 Objects of this class are most commonly generated by calling 

273 :func:`compute_static_structure_factors <dynasor.compute_static_structure_factors>`. 

274 They can then be written to and subsequently read from file. 

275 

276 You can see which correlation functions are available via the 

277 :attr:`available_correlation_functions` property. 

278 You can then access the correlation functions either by key or as property. 

279 For example, you could access the static structure factor :math:`S(q)` 

280 in the following ways:: 

281 

282 sample.Sq # as property 

283 sample['Sq'] # via key 

284 

285 The correlation functions are provided as numpy arrays. 

286 

287 There are several additional fields: 

288 

289 * `q_points`: list of q-point coordinates 

290 * `q_norms`: norms of the momentum vector (available when the :class:`StaticSample` 

291 object was generated via :func:`get_spherically_averaged_sample_smearing 

292 <dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar 

293 functions) 

294 

295 You can also see which fields are available by "printing" the :class:`StaticSample` object. 

296 

297 Parameters 

298 ---------- 

299 data_dict 

300 Dictionary with correlation functions. 

301 simulation_data 

302 Dictionary with simulation data. The following fields are strongly encouraged 

303 (but not enforced): `atom_types`, `cell`, `particle_counts`. 

304 history 

305 Previous history of operations on :class:`Sample` object. 

306 """ 

307 

308 def to_dataframe(self) -> DataFrame: 

309 """ Returns correlation functions as pandas dataframe """ 

310 df = DataFrame() 

311 for dim in self.dimensions: 

312 df[dim] = self[dim].tolist() # to list to make q-points (N, 3) work in dataframe 

313 for key in self.available_correlation_functions: 

314 df[key] = self[key].reshape(-1, ) 

315 return df 

316 

317 

318class DynamicSample(Sample): 

319 r""" 

320 Class for holding dynamic correlation functions and additional metadata. 

321 Objects of this class are most commonly generated by calling 

322 :func:`compute_dynamic_structure_factors <dynasor.compute_dynamic_structure_factors>`. 

323 They can then be written to and subsequently read from file. 

324 

325 You can see which correlation functions are available via the 

326 :attr:`available_correlation_functions` property. 

327 You can then access the correlation functions either by key or as property. 

328 For example, you could access the dynamic structure factor :math:`S(q,\omega)` 

329 in the following ways:: 

330 

331 sample.Sqw # as property 

332 sample['Sqw'] # via key 

333 

334 The correlation functions are provided as numpy arrays. 

335 

336 There are several additional fields, the availability of which depends on the 

337 type of correlation function that was sampled. 

338 

339 * `q_points`: list of q-point coordinates 

340 * `q_norms`: norms of the momentum vector (available when the :class:`DynamicSample` 

341 object was generated, e.g., via :func:`get_spherically_averaged_sample_smearing 

342 <dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar 

343 functions) 

344 * `time`: time 

345 * `omega`: frequency 

346 

347 You can also see which fields are available by "printing" the :class:`DynamicSample` object. 

348 

349 Parameters 

350 ---------- 

351 data_dict 

352 Dictionary with correlation functions. 

353 simulation_data 

354 Dictionary with simulation data. The following fields are strongly encouraged 

355 (but not enforced): `atom_types`, `cell`, `particle_counts`. 

356 history 

357 Previous history of operations on :class:`Sample` object. 

358 """ 

359 

360 @property 

361 def has_incoherent(self) -> bool: 

362 return 'Fqt_incoh' in self.available_correlation_functions 

363 

364 @property 

365 def has_currents(self) -> bool: 

366 pair_string = '_'.join(self.pairs[0]) 

367 return f'Clqt_{pair_string}' in self.available_correlation_functions 

368 

369 def to_dataframe(self, q_index: int) -> DataFrame: 

370 """ Returns correlation functions as pandas dataframe for the given q-index. 

371 

372 Parameters 

373 ---------- 

374 q_index 

375 Index of q-point to return. 

376 """ 

377 df = DataFrame() 

378 for dim in self.dimensions: 

379 if dim in ['q_points', 'q_norms']: 

380 continue 

381 df[dim] = self[dim] 

382 for key in self.available_correlation_functions: 

383 df[key] = self[key][q_index] 

384 return df 

385 

386 

387def read_sample_from_npz(fname: str) -> Sample: 

388 """ Read :class:`Sample <dynasor.sample.Sample>` from file. 

389 

390 Parameters 

391 ---------- 

392 fname 

393 Path to the file (numpy npz format) from which to read 

394 the :class:`Sample <dynasor.sample.Sample>` object. 

395 """ 

396 data_read = np.load(fname, allow_pickle=True) 

397 try: 

398 metadata = data_read['metadata'].item() 

399 except KeyError: 

400 # fallback for versions<=2.2 

401 metadata = data_read['meta_data'].item() 

402 

403 if 'simulation_data' in metadata: 403 ↛ 407line 403 didn't jump to line 407 because the condition on line 403 was always true

404 logger.debug(f'Reading Sample object from {fname} assuming version >=2.3') 

405 simulation_data = metadata['simulation_data'] 

406 else: 

407 logger.debug(f'Reading Sample object from {fname} assuming version <=2.2') 

408 simulation_data = {} 

409 for key in [ 

410 'atom_types', 'pairs', 'particle_counts', 'cell', 

411 'time_between_frames', 'maximum_time_lag', 'angular_frequency_resolution', 

412 'maximum_angular_frequency', 'number_of_frames', 

413 ]: 

414 if key in metadata: 

415 simulation_data[key] = metadata[key] 

416 

417 history = metadata['history'] if 'history' in metadata else None 

418 

419 data_dict = data_read['data_dict'].item() 

420 if data_read['name'] == 'StaticSample': 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 return StaticSample(data_dict, simulation_data, history=history) 

422 elif data_read['name'] == 'DynamicSample': 

423 return DynamicSample(data_dict, simulation_data, history=history) 

424 else: 

425 return Sample(data_dict, simulation_data, history=history)