import getpass
import socket
from copy import deepcopy
from datetime import datetime
from typing import Any
import numpy as np
import pandas as pd
from dynasor.logging_tools import logger
class Sample:
"""
Class for holding correlation functions and additional metadata.
Objects of this class are most commonly generated by calling functions such
as :func:`compute_static_structure_factors` or
:func:`compute_dynamic_structure_factors`.
They can then be written to and subsequently read from file.
You can see which correlation functions are available via the
:attr:`available_correlation_functions` property.
You can then access the correlation functions either by key or as property.
For example, you could access the static structure factor in the following ways::
sample.Sq # as property
sample['Sq'] # via key
The correlation functions are provided as numpy arrays.
There are several additional fields, the availability of which depends on the
type of correlation function that was sampled. Static samples, for example, do
not contain the `time` and `omega` fields. `q_norms` is only available if spherical
averaging was carried out, typically via :func:`get_spherically_averaged_sample_smearing`.
* `q_points`: list of q-point coordinates
* `q_norms`: norms of the momentum vector
* `time`: time
* `omega`: frequency
You can also see which fields are available by "printing" the :class:`Sample` object.
Parameters
----------
data_dict
Dictionary with correlation functions.
simulation_data
Dictionary with simulation data. The following fields are strongly encouraged
(but not enforced): `atom_types`, `cell`, `particle_counts`.
history
Previous history of operations on :class:`Sample` object.
"""
def __init__(
self,
data_dict: dict[str, Any],
simulation_data: dict[str, Any],
history: list[dict[str, Any]] = None,
):
# set data dict as attributes
self._data_keys = list(data_dict)
for key in data_dict:
setattr(self, key, data_dict[key])
# set metadata
# (using deepcopy here to avoid accidental transfer by reference)
self._metadata = dict()
self._metadata['simulation_data'] = deepcopy(simulation_data)
if history is not None:
logger.debug('Copying history')
self._metadata['history'] = deepcopy(history)
else:
self._metadata['history'] = []
def _append_history(
self,
calling_function: str,
caller_metadata: dict[str, Any] = None,
):
"""Add record to history.
Parameters
----------
calling_function
Name of calling function.
caller_metadata
Metadata associated with the calling function.
"""
from dynasor import __version__ as dynasor_version
new_record = dict(func=calling_function)
if caller_metadata is not None:
new_record.update(caller_metadata.copy())
new_record.update(dict(
date_time=datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
username=getpass.getuser(),
hostname=socket.gethostname(),
dynasor_version=dynasor_version,
))
self._metadata['history'].append(new_record)
def __getitem__(self, key):
""" Makes it possible to get the attributes using Sample['key'] """
try:
return getattr(self, key)
except AttributeError:
raise KeyError(key)
def write_to_npz(self, fname: str):
""" Write object to file in numpy npz format.
Parameters
----------
fname
Name of the file in which to store the Sample object.
"""
data_to_save = dict(name=self.__class__.__name__)
data_to_save['metadata'] = self._metadata
data_dict = dict()
for key in self._data_keys:
data_dict[key] = getattr(self, key)
data_to_save['data_dict'] = data_dict
np.savez_compressed(fname, **data_to_save)
@property
def available_correlation_functions(self) -> list[str]:
""" All the available correlation functions in sample. """
keys_to_skip = set(['q_points', 'q_norms', 'time', 'omega'])
return sorted(list(set(self._data_keys) - keys_to_skip))
@property
def dimensions(self) -> list[str]:
r"""The dimensions for the samples, e.g., for :math:`S(q, \omega)`
the dimensions would be the :math:`q` and :math:`\omega` axes.
"""
keys_to_skip = set(self.available_correlation_functions)
return sorted(list(set(self._data_keys) - keys_to_skip))
@property
def metadata(self) -> dict[str, Any]:
""" Metadata. """
return deepcopy(self._metadata)
@property
def simulation_data(self) -> dict[str, Any]:
""" Simulation data. """
return deepcopy(self._metadata['simulation_data'])
@property
def history(self) -> list[dict[str, Any]]:
""" List of operations applied to this :class:`Sample` object. """
return deepcopy(self._metadata['history'])
@property
def atom_types(self) -> list[str]:
""" Simulation data: Atom types. """
return self.simulation_data['atom_types'].copy() \
if 'atom_types' in self.simulation_data else None
@property
def particle_counts(self) -> dict[str, int]:
""" Simulation data: Number of particles per type. """
return self.simulation_data['particle_counts'].copy() \
if 'particle_counts' in self.simulation_data else None
@property
def pairs(self) -> list[tuple[str, str]]:
""" Pairs of types for which correlation functions are available. """
return self.simulation_data['pairs'].copy() \
if 'pairs' in self.simulation_data else None
@property
def cell(self) -> np.ndarray:
""" Simulation data: Cell metric. """
return self.simulation_data['cell'].copy() \
if 'cell' in self.simulation_data else None
@property
def has_incoherent(self):
""" Whether this sample contains the incoherent correlation functions or not. """
return False
@property
def has_currents(self):
""" Whether this sample contains the current correlation functions or not. """
return False
def __repr__(self):
return str(self)
_special_fields = dict(
atom_types='Atom types',
cell='Cell',
number_of_frames='Number of frames',
particle_counts='Particle counts',
maximum_time_lag='Maximum time lag',
angular_frequency_resolution='Angular frequency resolution',
time_between_frames='Time between frames',
)
def __str__(self):
s_contents = [self.__class__.__name__]
for key, value in sorted(self.simulation_data.items()):
s_contents.append(f'{self._special_fields.get(key, key)}: {value}')
for key in self.dimensions:
s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}'
s_contents.append(s_i)
for key in self.available_correlation_functions:
s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}'
s_contents.append(s_i)
s = '\n'.join(s_contents)
return s
def _repr_html_(self) -> str:
s = [f'<h3>{self.__class__.__name__}</h3>']
s += ['<h4>Simulation</h4>']
s += ['<table border="1" class="dataframe">']
s += ['<tbody>']
s += ['<thead><tr>'
'<th style="text-align: left">Name</th>'
'<th>Content</th>'
'</tr></thead>']
s += ['<tbody>']
for key, value in sorted(self.simulation_data.items()):
if key not in self._special_fields:
continue
s += [f'<tr><td style="text-align: left;">{self._special_fields[key]}</td>'
f'<td>{value}</td></tr>']
s += ['</tbody>']
s += ['</table>']
s += ['<h4>Dimensions</h4>']
s += ['<table border="1" class="dataframe">']
s += ['<tbody>']
s += ['<thead><tr>'
'<th style="text-align: left">Field</th>'
'<th>Size</th>'
'</tr></thead>']
s += ['<tbody>']
for key in self.dimensions:
s += [f'<tr><td style="text-align: left">{key}</td>'
f'<td>{np.shape(getattr(self, key))}</td></tr>']
s += ['</tbody>']
s += ['</table>']
s += ['<h4>History</h4>']
s += ['<table border="1" class="dataframe">']
s += ['<tbody>']
s += ['<thead><tr>'
'<th style="text-align: left">Function</th>'
'<th style="text-align: left">Field</th>'
'<th style="text-align: left">Content</th>'
'</tr></thead>']
s += ['<tbody>']
for entry in self.history:
title = entry.get('func', '')
for key, value in entry.items():
if key == 'func':
continue
s += [f'<tr><td style="text-align: left">{title}</td>'
f'<td style="text-align: left">{key}</td>'
f'<td>{value}</td></tr>']
title = ''
s += ['</tbody>']
s += ['</table>']
return '\n'.join(s)
[docs]
class StaticSample(Sample):
"""
Class for holding static correlation functions and additional metadata.
Objects of this class are most commonly generated by calling
:func:`compute_static_structure_factors <dynasor.compute_static_structure_factors>`.
They can then be written to and subsequently read from file.
You can see which correlation functions are available via the
:attr:`available_correlation_functions` property.
You can then access the correlation functions either by key or as property.
For example, you could access the static structure factor :math:`S(q)`
in the following ways::
sample.Sq # as property
sample['Sq'] # via key
The correlation functions are provided as numpy arrays.
There are additional fields including:
, the availability of which depends on the
type of correlation function that was sampled. `q_norms` is only available if spherical
averaging was carried out, typically via
* `q_points`: list of q-point coordinates
* `q_norms`: norms of the momentum vector (available when the :class:`DynamicSample`
object was generated, e.g., via :func:`get_spherically_averaged_sample_smearing
<dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar
functions)
You can also see which fields are available by "printing" the :class:`StaticSample` object.
Parameters
----------
data_dict
Dictionary with correlation functions.
simulation_data
Dictionary with simulation data. The following fields are strongly encouraged
(but not enforced): `atom_types`, `cell`, `particle_counts`.
calling_function
Name of calling function.
caller_metadata
Metadata associated with the calling function.
history
Previous history of operations on :class:`Sample` object.
"""
[docs]
def to_dataframe(self):
""" Returns correlation functions as pandas dataframe """
df = pd.DataFrame()
for dim in self.dimensions:
df[dim] = self[dim].tolist() # to list to make q-points (N, 3) work in dataframe
for key in self.available_correlation_functions:
df[key] = self[key].reshape(-1, )
return df
[docs]
class DynamicSample(Sample):
r"""
Class for holding dynamic correlation functions and additional metadata.
Objects of this class are most commonly generated by calling
:func:`compute_dynamic_structure_factors <dynasor.compute_dynamic_structure_factors>`.
They can then be written to and subsequently read from file.
You can see which correlation functions are available via the
:attr:`available_correlation_functions` property.
You can then access the correlation functions either by key or as property.
For example, you could access the dynamic structure factor :math:`S(q,\omega)`
in the following ways::
sample.Sqw # as property
sample['Sqw'] # via key
The correlation functions are provided as numpy arrays.
There are several additional fields, the availability of which depends on the
type of correlation function that was sampled.
* `q_points`: list of q-point coordinates
* `q_norms`: norms of the momentum vector (available when the :class:`DynamicSample`
object was generated, e.g., via :func:`get_spherically_averaged_sample_smearing
<dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar
functions)
* `time`: time
* `omega`: frequency
You can also see which fields are available by "printing" the :class:`DynamicSample` object.
Parameters
----------
data_dict
Dictionary with correlation functions.
simulation_data
Dictionary with simulation data. The following fields are strongly encouraged
(but not enforced): `atom_types`, `cell`, `particle_counts`.
calling_function
Name of calling function.
caller_metadata
Metadata associated with the calling function.
history
Previous history of operations on :class:`Sample` object.
"""
@property
def has_incoherent(self):
return 'Fqt_incoh' in self.available_correlation_functions
@property
def has_currents(self):
pair_string = '_'.join(self.pairs[0])
return f'Clqt_{pair_string}' in self.available_correlation_functions
[docs]
def to_dataframe(self, q_index: int):
""" Returns correlation functions as pandas dataframe for the given q-index.
Parameters
----------
q_index
index of q-point to return
"""
df = pd.DataFrame()
for dim in self.dimensions:
if dim in ['q_points', 'q_norms']:
continue
df[dim] = self[dim]
for key in self.available_correlation_functions:
df[key] = self[key][q_index]
return df
[docs]
def read_sample_from_npz(fname: str) -> Sample:
""" Read :class:`Sample <dynasor.sample.Sample>` from file.
Parameters
----------
fname
Path to the file (numpy npz format) from which to read
the :class:`Sample <dynasor.sample.Sample>` object.
"""
data_read = np.load(fname, allow_pickle=True)
try:
metadata = data_read['metadata'].item()
except KeyError:
# fallback for <=2.2
metadata = data_read['meta_data'].item()
if 'simulation_data' in metadata:
logger.debug(f'Reading Sample object from {fname} assuming version >=2.3')
simulation_data = metadata['simulation_data']
else:
logger.debug(f'Reading Sample object from {fname} assuming version <=2.2')
simulation_data = {}
for key in [
'atom_types', 'pairs', 'particle_counts', 'cell',
'time_between_frames', 'maximum_time_lag', 'angular_frequency_resolution',
'maximum_angular_frequency', 'number_of_frames',
]:
if key in metadata:
simulation_data[key] = metadata[key]
history = metadata['history'] if 'history' in metadata else None
data_dict = data_read['data_dict'].item()
if data_read['name'] == 'StaticSample':
return StaticSample(data_dict, simulation_data, history=history)
elif data_read['name'] == 'DynamicSample':
return DynamicSample(data_dict, simulation_data, history=history)
else:
return Sample(data_dict, simulation_data, history=history)