Source code for dynasor.tools.acfs

"""
A number of utility functions, for example for dealing with
autocorrelation functions, Fourier transforms, and smoothing.
"""

from typing import Optional
import numpy as np
from scipy.signal import correlate
from numpy.typing import NDArray
import pandas as pd


[docs] def psd_from_acf( acf: NDArray[float], dt: Optional[float] = 1, even: Optional[bool] = True, ) -> NDArray[float]: """Computes the power spectral density (PSD) from an auto-correlation function (ACF). Let x(t) be a time signal and define its auto-correlation function C(τ) = ⟨ x(t) x(t+τ) ⟩ where ⟨·⟩ denotes a time or ensemble average. According to the Wiener–Khinchin theorem, the power spectral density (PSD) of x(t) is the Fourier transform of its auto-correlation function: S(ω) = ∫ C(τ) e^{-i ω τ} dτ This function computes the PSD by performing a discrete Fourier transform of the provided ACF. Parameters ---------- acf The auto-correlation function C(τ) evaluated at discrete time lags. dt Time spacing between consecutive samples in the ACF. even Whether the ACF is assumed to be even in time. If True, the ACF is mirrored to construct a two-sided correlation function before computing the Fourier transform. Returns ------- freqs, psd Angular frequencies ω and corresponding PSD values. """ assert acf.ndim == 1 signal = np.asarray(acf) if even: signal = np.hstack((signal, signal[:0:-1])) fft = dt * np.fft.rfft(signal) if even: assert np.allclose(fft.imag, 0) psd = fft.real if even else fft freqs = np.fft.rfftfreq(len(signal), dt) freqs = 2 * np.pi * freqs return freqs, psd
[docs] def psd_from_acf_2d( acf: NDArray[float], dt: Optional[float] = 1, even: Optional[bool] = True, ) -> NDArray[float]: """Computes the power spectral density (PSD) from an auto-correlation function (ACF). This is the 2D analogue of `psd_from_acf`, where the ACF is provided for multiple signals, e.g. F(q, t). The Fourier transform is performed along the last axis. Parameters ---------- acf The ACF as a 2D array with shape (N, Nt), where the last axis corresponds to time lags. dt The time step between samples. even Whether the ACF is even in time and should be mirrored before computing the Fourier transform. Returns ------- freqs, psd Angular frequencies ω and corresponding PSD values with shape (N, Nω). """ assert acf.ndim == 2 signal = np.asarray(acf) if even: signal = np.hstack((signal, signal[:, :0:-1])) fft = dt * np.fft.rfft(signal, axis=1) if even: assert np.allclose(fft.imag, 0) psd = fft.real if even else fft freqs = np.fft.rfftfreq(signal.shape[1], dt) freqs = 2 * np.pi * freqs return freqs, psd
[docs] def psd_from_time_signal( x: NDArray[float], dt: Optional[float] = 1, complex: Optional[bool] = False, ) -> NDArray[float]: """Computes the power spectral density (PSD) directly from a time signal. Let x(t) be a time signal. The power spectral density (PSD) can be computed directly from the Fourier transform of the signal: ``S(ω) = |F[x(t)]|²`` where F denotes the Fourier transform. This function computes the PSD by performing a discrete Fourier transform of the time signal and taking the squared magnitude of the transform. This is equivalent to computing the PSD from the Fourier transform of the auto-correlation function. Parameters ---------- x Time signal as a 1D array. dt Time spacing between consecutive samples. complex Whether the time signal is complex. If False (default), the signal is assumed to be real and the PSD is computed using a real FFT (`rfft`), returning only non-negative frequencies. If True, the full FFT (`fft`) is used and both positive and negative frequencies are returned. Returns ------- freqs, psd Angular frequencies ω and corresponding PSD values. """ assert x.ndim == 1 signal = np.asarray(x) N = len(signal) if complex: fft = np.fft.fft(signal) freqs = np.fft.fftfreq(N, dt) else: fft = np.fft.rfft(signal) freqs = np.fft.rfftfreq(N, dt) psd = dt * np.abs(fft) ** 2 / N freqs = 2 * np.pi * freqs return freqs, psd
[docs] def compute_acf( Z: NDArray[float], delta_t: Optional[float] = 1.0, method: Optional[str] = 'scipy', ) -> NDArray[float]: r""" Computes the autocorrelation function (ACF) for a one-dimensional signal :math:`Z` in time as .. math:: \text{ACF}(\tau) = \frac{\left < Z(t) Z^*(t+\tau) \right >}{\left < Z(t) Z^*(t) \right >} Here, only the real part of the ACF is returned since if :math:`Z` is complex the imaginary part should average out to zero for any stationary signal. Parameters ---------- Z Complex time signal. delta_t Spacing in time between two consecutive values in :math:`Z`. method Implementation to use; possible values: `numpy` and `scipy` (default and usually faster). """ # keep only real part and normalize acf = _compute_correlation_function(Z, Z, method) acf = np.real(acf) acf /= acf[0] time_lags = delta_t * np.arange(0, len(acf), 1) return time_lags, acf
def _compute_correlation_function(Z1, Z2, method: Optional[str] = 'scipy'): N = len(Z1) assert len(Z1) == len(Z2) if method == 'scipy': cf = correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1) elif method == 'numpy': cf = np.correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1) else: raise ValueError('method must be either numpy or scipy') return cf # smoothing functions / FFT filters # -------------------------------------
[docs] def gaussian_decay(t: NDArray[float], t_sigma: float) -> NDArray[float]: r""" Evaluates a gaussian distribution in time :math:`f(t)`, which can be applied to an ACF in time to artificially damp it, i.e., forcing it to go to zero for long times. .. math:: f(t) = \exp{\left [-\frac{1}{2} \left (\frac{t}{t_\mathrm{sigma}}\right )^2 \right ] } Parameters ---------- t Time array. t_sigma Width (standard deviation of the gaussian) of the decay. """ return np.exp(- 1 / 2 * (t / t_sigma) ** 2)
[docs] def fermi_dirac(t: NDArray[float], t_0: float, t_width: float) -> NDArray[float]: r""" Evaluates a Fermi-Dirac-like function in time :math:`f(t)`, which can be applied to an auto-correlation function (ACF) in time to artificially dampen it, i.e., forcing it to go to zero for long times without affecting the short-time correlations too much. .. math:: f(t) = \frac{1}{\exp{[(t-t_0)/t_\mathrm{width}}] + 1} Parameters ---------- t Time array. t_0 Starting time for decay. t_width Width of the decay. """ return 1.0 / (np.exp((t - t_0) / t_width) + 1)
[docs] def smoothing_function( data: NDArray[float], window_size: int, window_type: Optional[str] = 'hamming', ) -> NDArray[float]: """ Smoothing function for 1D arrays. This functions employs the pandas rolling window average function. Parameters ---------- data 1D data array. window_size The size of smoothing/smearing window. window_type What type of window-shape to use, e.g. ``'blackman'``, ``'hamming'``, ``'boxcar'`` (see pandas and scipy documentaiton for more details). """ series = pd.Series(data) new_data = series.rolling(window_size, win_type=window_type, center=True, min_periods=1).mean() return np.array(new_data)