"""
A number of utility functions, for example for dealing with
autocorrelation functions, Fourier transforms, and smoothing.
"""
from typing import Optional
import numpy as np
from scipy.signal import correlate
from numpy.typing import NDArray
import pandas as pd
[docs]
def psd_from_acf(
acf: NDArray[float],
dt: Optional[float] = 1,
even: Optional[bool] = True,
) -> NDArray[float]:
"""Computes the power spectral density (PSD) from an auto-correlation function (ACF).
Let x(t) be a time signal and define its auto-correlation function
C(τ) = ⟨ x(t) x(t+τ) ⟩
where ⟨·⟩ denotes a time or ensemble average. According to the
Wiener–Khinchin theorem, the power spectral density (PSD) of x(t) is
the Fourier transform of its auto-correlation function:
S(ω) = ∫ C(τ) e^{-i ω τ} dτ
This function computes the PSD by performing a discrete Fourier
transform of the provided ACF.
Parameters
----------
acf
The auto-correlation function C(τ) evaluated at discrete time lags.
dt
Time spacing between consecutive samples in the ACF.
even
Whether the ACF is assumed to be even in time. If True, the ACF
is mirrored to construct a two-sided correlation function before
computing the Fourier transform.
Returns
-------
freqs, psd
Angular frequencies ω and corresponding PSD values.
"""
assert acf.ndim == 1
signal = np.asarray(acf)
if even:
signal = np.hstack((signal, signal[:0:-1]))
fft = dt * np.fft.rfft(signal)
if even:
assert np.allclose(fft.imag, 0)
psd = fft.real if even else fft
freqs = np.fft.rfftfreq(len(signal), dt)
freqs = 2 * np.pi * freqs
return freqs, psd
[docs]
def psd_from_acf_2d(
acf: NDArray[float],
dt: Optional[float] = 1,
even: Optional[bool] = True,
) -> NDArray[float]:
"""Computes the power spectral density (PSD) from an auto-correlation function (ACF).
This is the 2D analogue of `psd_from_acf`, where the ACF is provided for
multiple signals, e.g. F(q, t). The Fourier transform is performed along
the last axis.
Parameters
----------
acf
The ACF as a 2D array with shape (N, Nt), where the last axis corresponds
to time lags.
dt
The time step between samples.
even
Whether the ACF is even in time and should be mirrored before computing
the Fourier transform.
Returns
-------
freqs, psd
Angular frequencies ω and corresponding PSD values with shape (N, Nω).
"""
assert acf.ndim == 2
signal = np.asarray(acf)
if even:
signal = np.hstack((signal, signal[:, :0:-1]))
fft = dt * np.fft.rfft(signal, axis=1)
if even:
assert np.allclose(fft.imag, 0)
psd = fft.real if even else fft
freqs = np.fft.rfftfreq(signal.shape[1], dt)
freqs = 2 * np.pi * freqs
return freqs, psd
[docs]
def psd_from_time_signal(
x: NDArray[float],
dt: Optional[float] = 1,
complex: Optional[bool] = False,
) -> NDArray[float]:
"""Computes the power spectral density (PSD) directly from a time signal.
Let x(t) be a time signal. The power spectral density (PSD) can be
computed directly from the Fourier transform of the signal:
``S(ω) = |F[x(t)]|²``
where F denotes the Fourier transform.
This function computes the PSD by performing a discrete Fourier
transform of the time signal and taking the squared magnitude of
the transform.
This is equivalent to computing the PSD from the Fourier transform of the
auto-correlation function.
Parameters
----------
x
Time signal as a 1D array.
dt
Time spacing between consecutive samples.
complex
Whether the time signal is complex. If False (default), the signal is
assumed to be real and the PSD is computed using a real FFT (`rfft`),
returning only non-negative frequencies. If True, the full FFT (`fft`)
is used and both positive and negative frequencies are returned.
Returns
-------
freqs, psd
Angular frequencies ω and corresponding PSD values.
"""
assert x.ndim == 1
signal = np.asarray(x)
N = len(signal)
if complex:
fft = np.fft.fft(signal)
freqs = np.fft.fftfreq(N, dt)
else:
fft = np.fft.rfft(signal)
freqs = np.fft.rfftfreq(N, dt)
psd = dt * np.abs(fft) ** 2 / N
freqs = 2 * np.pi * freqs
return freqs, psd
[docs]
def compute_acf(
Z: NDArray[float],
delta_t: Optional[float] = 1.0,
method: Optional[str] = 'scipy',
) -> NDArray[float]:
r"""
Computes the autocorrelation function (ACF) for a one-dimensional signal :math:`Z` in time as
.. math::
\text{ACF}(\tau) = \frac{\left < Z(t) Z^*(t+\tau) \right >}{\left < Z(t) Z^*(t) \right >}
Here, only the real part of the ACF is returned since if :math:`Z` is complex
the imaginary part should average out to zero for any stationary signal.
Parameters
----------
Z
Complex time signal.
delta_t
Spacing in time between two consecutive values in :math:`Z`.
method
Implementation to use; possible values: `numpy` and `scipy` (default and usually faster).
"""
# keep only real part and normalize
acf = _compute_correlation_function(Z, Z, method)
acf = np.real(acf)
acf /= acf[0]
time_lags = delta_t * np.arange(0, len(acf), 1)
return time_lags, acf
def _compute_correlation_function(Z1, Z2, method: Optional[str] = 'scipy'):
N = len(Z1)
assert len(Z1) == len(Z2)
if method == 'scipy':
cf = correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1)
elif method == 'numpy':
cf = np.correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1)
else:
raise ValueError('method must be either numpy or scipy')
return cf
# smoothing functions / FFT filters
# -------------------------------------
[docs]
def gaussian_decay(t: NDArray[float], t_sigma: float) -> NDArray[float]:
r"""
Evaluates a gaussian distribution in time :math:`f(t)`, which can be applied to an ACF in time
to artificially damp it, i.e., forcing it to go to zero for long times.
.. math::
f(t) = \exp{\left [-\frac{1}{2} \left (\frac{t}{t_\mathrm{sigma}}\right )^2 \right ] }
Parameters
----------
t
Time array.
t_sigma
Width (standard deviation of the gaussian) of the decay.
"""
return np.exp(- 1 / 2 * (t / t_sigma) ** 2)
[docs]
def fermi_dirac(t: NDArray[float], t_0: float, t_width: float) -> NDArray[float]:
r"""
Evaluates a Fermi-Dirac-like function in time :math:`f(t)`, which can be applied to an
auto-correlation function (ACF) in time to artificially dampen it, i.e., forcing it to
go to zero for long times without affecting the short-time correlations too much.
.. math::
f(t) = \frac{1}{\exp{[(t-t_0)/t_\mathrm{width}}] + 1}
Parameters
----------
t
Time array.
t_0
Starting time for decay.
t_width
Width of the decay.
"""
return 1.0 / (np.exp((t - t_0) / t_width) + 1)
[docs]
def smoothing_function(
data: NDArray[float],
window_size: int,
window_type: Optional[str] = 'hamming',
) -> NDArray[float]:
"""
Smoothing function for 1D arrays.
This functions employs the pandas rolling window average function.
Parameters
----------
data
1D data array.
window_size
The size of smoothing/smearing window.
window_type
What type of window-shape to use, e.g. ``'blackman'``, ``'hamming'``, ``'boxcar'``
(see pandas and scipy documentaiton for more details).
"""
series = pd.Series(data)
new_data = series.rolling(window_size, win_type=window_type, center=True, min_periods=1).mean()
return np.array(new_data)