Coverage for dynasor / sample.py: 88%
171 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 12:31 +0000
1import getpass
2import socket
3from copy import deepcopy
4from datetime import datetime
5from typing import Any, Optional
6import numpy as np
7from dynasor.logging_tools import logger
8from numpy.typing import NDArray
9from pandas import DataFrame
12class Sample:
13 """
14 Class for holding correlation functions and additional metadata.
15 Objects of this class are most commonly generated by calling functions such
16 as :func:`compute_static_structure_factors` or
17 :func:`compute_dynamic_structure_factors`.
18 They can then be written to and subsequently read from file.
20 You can see which correlation functions are available via the
21 :attr:`available_correlation_functions` property.
22 You can then access the correlation functions either by key or as property.
23 For example, you could access the static structure factor in the following ways::
25 sample.Sq # as property
26 sample['Sq'] # via key
28 The correlation functions are provided as numpy arrays.
30 There are several additional fields, the availability of which depends on the
31 type of correlation function that was sampled. Static samples, for example, do
32 not contain the `time` and `omega` fields. `q_norms` is only available if spherical
33 averaging was carried out, typically via :func:`get_spherically_averaged_sample_smearing`.
35 * `q_points`: list of q-point coordinates
36 * `q_norms`: norms of the momentum vector
37 * `time`: time
38 * `omega`: frequency
40 You can also see which fields are available by "printing" the :class:`Sample` object.
42 Parameters
43 ----------
44 data_dict
45 Dictionary with correlation functions.
46 simulation_data
47 Dictionary with simulation data. The following fields are strongly encouraged
48 (but not enforced): `atom_types`, `cell`, `particle_counts`.
49 history
50 Previous history of operations on :class:`Sample` object.
51 """
53 def __init__(
54 self,
55 data_dict: dict[str, Any],
56 simulation_data: dict[str, Any],
57 history: Optional[list[dict[str, Any]]] = None,
58 ):
59 # set data dict as attributes
60 self._data_keys = list(data_dict)
61 for key in data_dict:
62 setattr(self, key, data_dict[key])
64 # set metadata
65 # (using deepcopy here to avoid accidental transfer by reference)
66 self._metadata = dict()
67 self._metadata['simulation_data'] = deepcopy(simulation_data)
68 if history is not None:
69 logger.debug('Copying history')
70 self._metadata['history'] = deepcopy(history)
71 else:
72 self._metadata['history'] = []
74 def _append_history(
75 self,
76 calling_function: str,
77 caller_metadata: Optional[dict[str, Any]] = None,
78 ):
79 """Add record to history.
81 Parameters
82 ----------
83 calling_function
84 Name of calling function.
85 caller_metadata
86 Metadata associated with the calling function.
87 """
88 from dynasor import __version__ as dynasor_version
90 new_record = dict(func=calling_function)
91 if caller_metadata is not None:
92 new_record.update(caller_metadata.copy())
93 new_record.update(dict(
94 date_time=datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
95 username=getpass.getuser(),
96 hostname=socket.gethostname(),
97 dynasor_version=dynasor_version,
98 ))
99 self._metadata['history'].append(new_record)
101 def __getitem__(self, key):
102 """ Makes it possible to get the attributes using Sample['key'] """
103 try:
104 return getattr(self, key)
105 except AttributeError:
106 raise KeyError(key)
108 def write_to_npz(self, fname: str):
109 """ Write object to file in numpy npz format.
111 Parameters
112 ----------
113 fname
114 Name of the file in which to store the Sample object.
115 """
116 data_to_save = dict(name=self.__class__.__name__)
117 data_to_save['metadata'] = self._metadata
118 data_dict = dict()
119 for key in self._data_keys:
120 data_dict[key] = getattr(self, key)
121 data_to_save['data_dict'] = data_dict
122 np.savez_compressed(fname, **data_to_save)
124 @property
125 def available_correlation_functions(self) -> list[str]:
126 """ All the available correlation functions in sample. """
127 keys_to_skip = set(['q_points', 'q_norms', 'time', 'omega'])
128 return sorted(list(set(self._data_keys) - keys_to_skip))
130 @property
131 def dimensions(self) -> list[str]:
132 r"""The dimensions for the samples, e.g., for :math:`S(q, \omega)`
133 the dimensions would be the :math:`q` and :math:`\omega` axes.
134 """
135 keys_to_skip = set(self.available_correlation_functions)
136 return sorted(list(set(self._data_keys) - keys_to_skip))
138 @property
139 def metadata(self) -> dict[str, Any]:
140 """ Metadata. """
141 return deepcopy(self._metadata)
143 @property
144 def simulation_data(self) -> dict[str, Any]:
145 """ Simulation data. """
146 return deepcopy(self._metadata['simulation_data'])
148 @property
149 def history(self) -> list[dict[str, Any]]:
150 """ List of operations applied to this :class:`Sample` object. """
151 return deepcopy(self._metadata['history'])
153 @property
154 def atom_types(self) -> list[str]:
155 """ Simulation data: Atom types. """
156 return self.simulation_data['atom_types'].copy() \
157 if 'atom_types' in self.simulation_data else None
159 @property
160 def particle_counts(self) -> dict[str, int]:
161 """ Simulation data: Number of particles per type. """
162 return self.simulation_data['particle_counts'].copy() \
163 if 'particle_counts' in self.simulation_data else None
165 @property
166 def pairs(self) -> list[tuple[str, str]]:
167 """ Pairs of types for which correlation functions are available. """
168 return self.simulation_data['pairs'].copy() \
169 if 'pairs' in self.simulation_data else None
171 @property
172 def cell(self) -> NDArray[float]:
173 """ Simulation data: Cell metric. """
174 return self.simulation_data['cell'].copy() \
175 if 'cell' in self.simulation_data else None
177 @property
178 def has_incoherent(self) -> bool:
179 """ Whether this sample contains the incoherent correlation functions or not. """
180 return False
182 @property
183 def has_currents(self) -> bool:
184 """ Whether this sample contains the current correlation functions or not. """
185 return False
187 def __repr__(self) -> str:
188 return str(self)
190 _special_fields = dict(
191 atom_types='Atom types',
192 cell='Cell',
193 number_of_frames='Number of frames',
194 particle_counts='Particle counts',
195 maximum_time_lag='Maximum time lag',
196 angular_frequency_resolution='Angular frequency resolution',
197 time_between_frames='Time between frames',
198 )
200 def __str__(self) -> str:
201 s_contents = [self.__class__.__name__]
202 for key, value in sorted(self.simulation_data.items()):
203 s_contents.append(f'{self._special_fields.get(key, key)}: {value}')
204 for key in self.dimensions:
205 s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}'
206 s_contents.append(s_i)
207 for key in self.available_correlation_functions:
208 s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}'
209 s_contents.append(s_i)
210 s = '\n'.join(s_contents)
211 return s
213 def _repr_html_(self) -> str:
214 s = [f'<h3>{self.__class__.__name__}</h3>']
216 s += ['<h4>Simulation</h4>']
217 s += ['<table border="1" class="dataframe">']
218 s += ['<tbody>']
219 s += ['<thead><tr>'
220 '<th style="text-align: left">Name</th>'
221 '<th>Content</th>'
222 '</tr></thead>']
223 s += ['<tbody>']
224 for key, value in sorted(self.simulation_data.items()):
225 if key not in self._special_fields:
226 continue
227 s += [f'<tr><td style="text-align: left;">{self._special_fields[key]}</td>'
228 f'<td>{value}</td></tr>']
229 s += ['</tbody>']
230 s += ['</table>']
232 s += ['<h4>Dimensions</h4>']
233 s += ['<table border="1" class="dataframe">']
234 s += ['<tbody>']
235 s += ['<thead><tr>'
236 '<th style="text-align: left">Field</th>'
237 '<th>Size</th>'
238 '</tr></thead>']
239 s += ['<tbody>']
240 for key in self.dimensions:
241 s += [f'<tr><td style="text-align: left">{key}</td>'
242 f'<td>{np.shape(getattr(self, key))}</td></tr>']
243 s += ['</tbody>']
244 s += ['</table>']
246 s += ['<h4>History</h4>']
247 s += ['<table border="1" class="dataframe">']
248 s += ['<tbody>']
249 s += ['<thead><tr>'
250 '<th style="text-align: left">Function</th>'
251 '<th style="text-align: left">Field</th>'
252 '<th style="text-align: left">Content</th>'
253 '</tr></thead>']
254 s += ['<tbody>']
255 for entry in self.history: 255 ↛ 256line 255 didn't jump to line 256 because the loop on line 255 never started
256 title = entry.get('func', '')
257 for key, value in entry.items():
258 if key == 'func':
259 continue
260 s += [f'<tr><td style="text-align: left">{title}</td>'
261 f'<td style="text-align: left">{key}</td>'
262 f'<td>{value}</td></tr>']
263 title = ''
264 s += ['</tbody>']
265 s += ['</table>']
266 return '\n'.join(s)
269class StaticSample(Sample):
270 """
271 Class for holding static correlation functions and additional metadata.
272 Objects of this class are most commonly generated by calling
273 :func:`compute_static_structure_factors <dynasor.compute_static_structure_factors>`.
274 They can then be written to and subsequently read from file.
276 You can see which correlation functions are available via the
277 :attr:`available_correlation_functions` property.
278 You can then access the correlation functions either by key or as property.
279 For example, you could access the static structure factor :math:`S(q)`
280 in the following ways::
282 sample.Sq # as property
283 sample['Sq'] # via key
285 The correlation functions are provided as numpy arrays.
287 There are several additional fields:
289 * `q_points`: list of q-point coordinates
290 * `q_norms`: norms of the momentum vector (available when the :class:`StaticSample`
291 object was generated via :func:`get_spherically_averaged_sample_smearing
292 <dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar
293 functions)
295 You can also see which fields are available by "printing" the :class:`StaticSample` object.
297 Parameters
298 ----------
299 data_dict
300 Dictionary with correlation functions.
301 simulation_data
302 Dictionary with simulation data. The following fields are strongly encouraged
303 (but not enforced): `atom_types`, `cell`, `particle_counts`.
304 history
305 Previous history of operations on :class:`Sample` object.
306 """
308 def to_dataframe(self) -> DataFrame:
309 """ Returns correlation functions as pandas dataframe """
310 df = DataFrame()
311 for dim in self.dimensions:
312 df[dim] = self[dim].tolist() # to list to make q-points (N, 3) work in dataframe
313 for key in self.available_correlation_functions:
314 df[key] = self[key].reshape(-1, )
315 return df
318class DynamicSample(Sample):
319 r"""
320 Class for holding dynamic correlation functions and additional metadata.
321 Objects of this class are most commonly generated by calling
322 :func:`compute_dynamic_structure_factors <dynasor.compute_dynamic_structure_factors>`.
323 They can then be written to and subsequently read from file.
325 You can see which correlation functions are available via the
326 :attr:`available_correlation_functions` property.
327 You can then access the correlation functions either by key or as property.
328 For example, you could access the dynamic structure factor :math:`S(q,\omega)`
329 in the following ways::
331 sample.Sqw # as property
332 sample['Sqw'] # via key
334 The correlation functions are provided as numpy arrays.
336 There are several additional fields, the availability of which depends on the
337 type of correlation function that was sampled.
339 * `q_points`: list of q-point coordinates
340 * `q_norms`: norms of the momentum vector (available when the :class:`DynamicSample`
341 object was generated, e.g., via :func:`get_spherically_averaged_sample_smearing
342 <dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar
343 functions)
344 * `time`: time
345 * `omega`: frequency
347 You can also see which fields are available by "printing" the :class:`DynamicSample` object.
349 Parameters
350 ----------
351 data_dict
352 Dictionary with correlation functions.
353 simulation_data
354 Dictionary with simulation data. The following fields are strongly encouraged
355 (but not enforced): `atom_types`, `cell`, `particle_counts`.
356 history
357 Previous history of operations on :class:`Sample` object.
358 """
360 @property
361 def has_incoherent(self) -> bool:
362 return 'Fqt_incoh' in self.available_correlation_functions
364 @property
365 def has_currents(self) -> bool:
366 pair_string = '_'.join(self.pairs[0])
367 return f'Clqt_{pair_string}' in self.available_correlation_functions
369 def to_dataframe(self, q_index: int) -> DataFrame:
370 """ Returns correlation functions as pandas dataframe for the given q-index.
372 Parameters
373 ----------
374 q_index
375 Index of q-point to return.
376 """
377 df = DataFrame()
378 for dim in self.dimensions:
379 if dim in ['q_points', 'q_norms']:
380 continue
381 df[dim] = self[dim]
382 for key in self.available_correlation_functions:
383 df[key] = self[key][q_index]
384 return df
387def read_sample_from_npz(fname: str) -> Sample:
388 """ Read :class:`Sample <dynasor.sample.Sample>` from file.
390 Parameters
391 ----------
392 fname
393 Path to the file (numpy npz format) from which to read
394 the :class:`Sample <dynasor.sample.Sample>` object.
395 """
396 data_read = np.load(fname, allow_pickle=True)
397 try:
398 metadata = data_read['metadata'].item()
399 except KeyError:
400 # fallback for versions<=2.2
401 metadata = data_read['meta_data'].item()
403 if 'simulation_data' in metadata: 403 ↛ 407line 403 didn't jump to line 407 because the condition on line 403 was always true
404 logger.debug(f'Reading Sample object from {fname} assuming version >=2.3')
405 simulation_data = metadata['simulation_data']
406 else:
407 logger.debug(f'Reading Sample object from {fname} assuming version <=2.2')
408 simulation_data = {}
409 for key in [
410 'atom_types', 'pairs', 'particle_counts', 'cell',
411 'time_between_frames', 'maximum_time_lag', 'angular_frequency_resolution',
412 'maximum_angular_frequency', 'number_of_frames',
413 ]:
414 if key in metadata:
415 simulation_data[key] = metadata[key]
417 history = metadata['history'] if 'history' in metadata else None
419 data_dict = data_read['data_dict'].item()
420 if data_read['name'] == 'StaticSample': 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 return StaticSample(data_dict, simulation_data, history=history)
422 elif data_read['name'] == 'DynamicSample':
423 return DynamicSample(data_dict, simulation_data, history=history)
424 else:
425 return Sample(data_dict, simulation_data, history=history)