Source code for dynasor.sample

import getpass
import socket
from copy import deepcopy
from datetime import datetime
from typing import Any
import numpy as np
import pandas as pd
from dynasor.logging_tools import logger


class Sample:
    """
    Class for holding correlation functions and additional metadata.
    Objects of this class are most commonly generated by calling functions such
    as :func:`compute_static_structure_factors` or
    :func:`compute_dynamic_structure_factors`.
    They can then be written to and subsequently read from file.

    You can see which correlation functions are available via the
    :attr:`available_correlation_functions` property.
    You can then access the correlation functions either by key or as property.
    For example, you could access the static structure factor in the following ways::

        sample.Sq  # as property
        sample['Sq']  # via key

    The correlation functions are provided as numpy arrays.

    There are several additional fields, the availability of which depends on the
    type of correlation function that was sampled. Static samples, for example, do
    not contain the `time` and `omega` fields. `q_norms` is only available if spherical
    averaging was carried out, typically via :func:`get_spherically_averaged_sample_smearing`.

    * `q_points`: list of q-point coordinates
    * `q_norms`: norms of the momentum vector
    * `time`: time
    * `omega`: frequency

    You can also see which fields are available by "printing" the :class:`Sample` object.

    Parameters
    ----------
    data_dict
        Dictionary with correlation functions.
    simulation_data
        Dictionary with simulation data. The following fields are strongly encouraged
        (but not enforced): `atom_types`, `cell`, `particle_counts`.
    history
        Previous history of operations on :class:`Sample` object.
    """

    def __init__(
        self,
        data_dict: dict[str, Any],
        simulation_data: dict[str, Any],
        history: list[dict[str, Any]] = None,
    ):
        # set data dict as attributes
        self._data_keys = list(data_dict)
        for key in data_dict:
            setattr(self, key, data_dict[key])

        # set metadata
        # (using deepcopy here to avoid accidental transfer by reference)
        self._metadata = dict()
        self._metadata['simulation_data'] = deepcopy(simulation_data)
        if history is not None:
            logger.debug('Copying history')
            self._metadata['history'] = deepcopy(history)
        else:
            self._metadata['history'] = []

    def _append_history(
        self,
        calling_function: str,
        caller_metadata: dict[str, Any] = None,
    ):
        """Add record to history.

        Parameters
        ----------
        calling_function
            Name of calling function.
        caller_metadata
            Metadata associated with the calling function.
        """
        from dynasor import __version__ as dynasor_version

        new_record = dict(func=calling_function)
        if caller_metadata is not None:
            new_record.update(caller_metadata.copy())
        new_record.update(dict(
            date_time=datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
            username=getpass.getuser(),
            hostname=socket.gethostname(),
            dynasor_version=dynasor_version,
        ))
        self._metadata['history'].append(new_record)

    def __getitem__(self, key):
        """ Makes it possible to get the attributes using Sample['key'] """
        try:
            return getattr(self, key)
        except AttributeError:
            raise KeyError(key)

    def write_to_npz(self, fname: str):
        """ Write object to file in numpy npz format.

        Parameters
        ----------
        fname
            Name of the file in which to store the Sample object.
        """
        data_to_save = dict(name=self.__class__.__name__)
        data_to_save['metadata'] = self._metadata
        data_dict = dict()
        for key in self._data_keys:
            data_dict[key] = getattr(self, key)
        data_to_save['data_dict'] = data_dict
        np.savez_compressed(fname, **data_to_save)

    @property
    def available_correlation_functions(self) -> list[str]:
        """ All the available correlation functions in sample. """
        keys_to_skip = set(['q_points', 'q_norms', 'time', 'omega'])
        return sorted(list(set(self._data_keys) - keys_to_skip))

    @property
    def dimensions(self) -> list[str]:
        r"""The dimensions for the samples, e.g., for :math:`S(q, \omega)`
        the dimensions would be the :math:`q` and :math:`\omega` axes.
        """
        keys_to_skip = set(self.available_correlation_functions)
        return sorted(list(set(self._data_keys) - keys_to_skip))

    @property
    def metadata(self) -> dict[str, Any]:
        """ Metadata. """
        return deepcopy(self._metadata)

    @property
    def simulation_data(self) -> dict[str, Any]:
        """ Simulation data. """
        return deepcopy(self._metadata['simulation_data'])

    @property
    def history(self) -> list[dict[str, Any]]:
        """ List of operations applied to this :class:`Sample` object. """
        return deepcopy(self._metadata['history'])

    @property
    def atom_types(self) -> list[str]:
        """ Simulation data: Atom types. """
        return self.simulation_data['atom_types'].copy() \
            if 'atom_types' in self.simulation_data else None

    @property
    def particle_counts(self) -> dict[str, int]:
        """ Simulation data: Number of particles per type. """
        return self.simulation_data['particle_counts'].copy() \
            if 'particle_counts' in self.simulation_data else None

    @property
    def pairs(self) -> list[tuple[str, str]]:
        """ Pairs of types for which correlation functions are available. """
        return self.simulation_data['pairs'].copy() \
            if 'pairs' in self.simulation_data else None

    @property
    def cell(self) -> np.ndarray:
        """ Simulation data: Cell metric. """
        return self.simulation_data['cell'].copy() \
            if 'cell' in self.simulation_data else None

    @property
    def has_incoherent(self):
        """ Whether this sample contains the incoherent correlation functions or not. """
        return False

    @property
    def has_currents(self):
        """ Whether this sample contains the current correlation functions or not. """
        return False

    def __repr__(self):
        return str(self)

    _special_fields = dict(
        atom_types='Atom types',
        cell='Cell',
        number_of_frames='Number of frames',
        particle_counts='Particle counts',
        maximum_time_lag='Maximum time lag',
        angular_frequency_resolution='Angular frequency resolution',
        time_between_frames='Time between frames',
    )

    def __str__(self):
        s_contents = [self.__class__.__name__]
        for key, value in sorted(self.simulation_data.items()):
            s_contents.append(f'{self._special_fields.get(key, key)}: {value}')
        for key in self.dimensions:
            s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}'
            s_contents.append(s_i)
        for key in self.available_correlation_functions:
            s_i = f'{key:15} with shape: {np.shape(getattr(self, key))}'
            s_contents.append(s_i)
        s = '\n'.join(s_contents)
        return s

    def _repr_html_(self) -> str:
        s = [f'<h3>{self.__class__.__name__}</h3>']

        s += ['<h4>Simulation</h4>']
        s += ['<table border="1" class="dataframe">']
        s += ['<tbody>']
        s += ['<thead><tr>'
              '<th style="text-align: left">Name</th>'
              '<th>Content</th>'
              '</tr></thead>']
        s += ['<tbody>']
        for key, value in sorted(self.simulation_data.items()):
            if key not in self._special_fields:
                continue
            s += [f'<tr><td style="text-align: left;">{self._special_fields[key]}</td>'
                  f'<td>{value}</td></tr>']
        s += ['</tbody>']
        s += ['</table>']

        s += ['<h4>Dimensions</h4>']
        s += ['<table border="1" class="dataframe">']
        s += ['<tbody>']
        s += ['<thead><tr>'
              '<th style="text-align: left">Field</th>'
              '<th>Size</th>'
              '</tr></thead>']
        s += ['<tbody>']
        for key in self.dimensions:
            s += [f'<tr><td style="text-align: left">{key}</td>'
                  f'<td>{np.shape(getattr(self, key))}</td></tr>']
        s += ['</tbody>']
        s += ['</table>']

        s += ['<h4>History</h4>']
        s += ['<table border="1" class="dataframe">']
        s += ['<tbody>']
        s += ['<thead><tr>'
              '<th style="text-align: left">Function</th>'
              '<th style="text-align: left">Field</th>'
              '<th style="text-align: left">Content</th>'
              '</tr></thead>']
        s += ['<tbody>']
        for entry in self.history:
            title = entry.get('func', '')
            for key, value in entry.items():
                if key == 'func':
                    continue
                s += [f'<tr><td style="text-align: left">{title}</td>'
                      f'<td style="text-align: left">{key}</td>'
                      f'<td>{value}</td></tr>']
                title = ''
        s += ['</tbody>']
        s += ['</table>']
        return '\n'.join(s)


[docs] class StaticSample(Sample): """ Class for holding static correlation functions and additional metadata. Objects of this class are most commonly generated by calling :func:`compute_static_structure_factors <dynasor.compute_static_structure_factors>`. They can then be written to and subsequently read from file. You can see which correlation functions are available via the :attr:`available_correlation_functions` property. You can then access the correlation functions either by key or as property. For example, you could access the static structure factor :math:`S(q)` in the following ways:: sample.Sq # as property sample['Sq'] # via key The correlation functions are provided as numpy arrays. There are additional fields including: , the availability of which depends on the type of correlation function that was sampled. `q_norms` is only available if spherical averaging was carried out, typically via * `q_points`: list of q-point coordinates * `q_norms`: norms of the momentum vector (available when the :class:`DynamicSample` object was generated, e.g., via :func:`get_spherically_averaged_sample_smearing <dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar functions) You can also see which fields are available by "printing" the :class:`StaticSample` object. Parameters ---------- data_dict Dictionary with correlation functions. simulation_data Dictionary with simulation data. The following fields are strongly encouraged (but not enforced): `atom_types`, `cell`, `particle_counts`. calling_function Name of calling function. caller_metadata Metadata associated with the calling function. history Previous history of operations on :class:`Sample` object. """
[docs] def to_dataframe(self): """ Returns correlation functions as pandas dataframe """ df = pd.DataFrame() for dim in self.dimensions: df[dim] = self[dim].tolist() # to list to make q-points (N, 3) work in dataframe for key in self.available_correlation_functions: df[key] = self[key].reshape(-1, ) return df
[docs] class DynamicSample(Sample): r""" Class for holding dynamic correlation functions and additional metadata. Objects of this class are most commonly generated by calling :func:`compute_dynamic_structure_factors <dynasor.compute_dynamic_structure_factors>`. They can then be written to and subsequently read from file. You can see which correlation functions are available via the :attr:`available_correlation_functions` property. You can then access the correlation functions either by key or as property. For example, you could access the dynamic structure factor :math:`S(q,\omega)` in the following ways:: sample.Sqw # as property sample['Sqw'] # via key The correlation functions are provided as numpy arrays. There are several additional fields, the availability of which depends on the type of correlation function that was sampled. * `q_points`: list of q-point coordinates * `q_norms`: norms of the momentum vector (available when the :class:`DynamicSample` object was generated, e.g., via :func:`get_spherically_averaged_sample_smearing <dynasor.post_processing.get_spherically_averaged_sample_smearing>` or similar functions) * `time`: time * `omega`: frequency You can also see which fields are available by "printing" the :class:`DynamicSample` object. Parameters ---------- data_dict Dictionary with correlation functions. simulation_data Dictionary with simulation data. The following fields are strongly encouraged (but not enforced): `atom_types`, `cell`, `particle_counts`. calling_function Name of calling function. caller_metadata Metadata associated with the calling function. history Previous history of operations on :class:`Sample` object. """ @property def has_incoherent(self): return 'Fqt_incoh' in self.available_correlation_functions @property def has_currents(self): pair_string = '_'.join(self.pairs[0]) return f'Clqt_{pair_string}' in self.available_correlation_functions
[docs] def to_dataframe(self, q_index: int): """ Returns correlation functions as pandas dataframe for the given q-index. Parameters ---------- q_index index of q-point to return """ df = pd.DataFrame() for dim in self.dimensions: if dim in ['q_points', 'q_norms']: continue df[dim] = self[dim] for key in self.available_correlation_functions: df[key] = self[key][q_index] return df
[docs] def read_sample_from_npz(fname: str) -> Sample: """ Read :class:`Sample <dynasor.sample.Sample>` from file. Parameters ---------- fname Path to the file (numpy npz format) from which to read the :class:`Sample <dynasor.sample.Sample>` object. """ data_read = np.load(fname, allow_pickle=True) try: metadata = data_read['metadata'].item() except KeyError: # fallback for <=2.2 metadata = data_read['meta_data'].item() if 'simulation_data' in metadata: logger.debug(f'Reading Sample object from {fname} assuming version >=2.3') simulation_data = metadata['simulation_data'] else: logger.debug(f'Reading Sample object from {fname} assuming version <=2.2') simulation_data = {} for key in [ 'atom_types', 'pairs', 'particle_counts', 'cell', 'time_between_frames', 'maximum_time_lag', 'angular_frequency_resolution', 'maximum_angular_frequency', 'number_of_frames', ]: if key in metadata: simulation_data[key] = metadata[key] history = metadata['history'] if 'history' in metadata else None data_dict = data_read['data_dict'].item() if data_read['name'] == 'StaticSample': return StaticSample(data_dict, simulation_data, history=history) elif data_read['name'] == 'DynamicSample': return DynamicSample(data_dict, simulation_data, history=history) else: return Sample(data_dict, simulation_data, history=history)