Coverage for dynasor / tools / acfs.py: 99%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 06:28 +0000

1""" 

2A number of utility functions, for example for dealing with 

3autocorrelation functions, Fourier transforms, and smoothing. 

4""" 

5 

6from typing import Optional 

7import numpy as np 

8from scipy.signal import correlate 

9from numpy.typing import NDArray 

10import pandas as pd 

11 

12 

13def psd_from_acf( 

14 acf: NDArray[float], 

15 dt: Optional[float] = 1, 

16 even: Optional[bool] = True, 

17) -> NDArray[float]: 

18 """Computes the power spectral density (PSD) from an auto-correlation function (ACF). 

19 

20 Let x(t) be a time signal and define its auto-correlation function 

21 

22 C(τ) = ⟨ x(t) x(t+τ) ⟩ 

23 

24 where ⟨·⟩ denotes a time or ensemble average. According to the 

25 Wiener–Khinchin theorem, the power spectral density (PSD) of x(t) is 

26 the Fourier transform of its auto-correlation function: 

27 

28 S(ω) = ∫ C(τ) e^{-i ω τ} dτ 

29 

30 This function computes the PSD by performing a discrete Fourier 

31 transform of the provided ACF. 

32 

33 Parameters 

34 ---------- 

35 acf 

36 The auto-correlation function C(τ) evaluated at discrete time lags. 

37 dt 

38 Time spacing between consecutive samples in the ACF. 

39 even 

40 Whether the ACF is assumed to be even in time. If True, the ACF 

41 is mirrored to construct a two-sided correlation function before 

42 computing the Fourier transform. 

43 

44 Returns 

45 ------- 

46 freqs, psd 

47 Angular frequencies ω and corresponding PSD values. 

48 """ 

49 assert acf.ndim == 1 

50 

51 signal = np.asarray(acf) 

52 

53 if even: 

54 signal = np.hstack((signal, signal[:0:-1])) 

55 

56 fft = dt * np.fft.rfft(signal) 

57 psd = fft.real if even else fft 

58 

59 freqs = np.fft.rfftfreq(len(signal), dt) 

60 freqs = 2 * np.pi * freqs 

61 

62 return freqs, psd 

63 

64 

65def psd_from_acf_2d( 

66 acf: NDArray[float], 

67 dt: Optional[float] = 1, 

68 even: Optional[bool] = True, 

69) -> NDArray[float]: 

70 """Computes the power spectral density (PSD) from an auto-correlation function (ACF). 

71 

72 This is the 2D analogue of `psd_from_acf`, where the ACF is provided for 

73 multiple signals, e.g. F(q, t). The Fourier transform is performed along 

74 the last axis. 

75 

76 Parameters 

77 ---------- 

78 acf 

79 The ACF as a 2D array with shape (N, Nt), where the last axis corresponds 

80 to time lags. 

81 dt 

82 The time step between samples. 

83 even 

84 Whether the ACF is even in time and should be mirrored before computing 

85 the Fourier transform. 

86 

87 Returns 

88 ------- 

89 freqs, psd 

90 Angular frequencies ω and corresponding PSD values with shape (N, Nω). 

91 """ 

92 assert acf.ndim == 2 

93 

94 signal = np.asarray(acf) 

95 

96 if even: 

97 signal = np.hstack((signal, signal[:, :0:-1])) 

98 

99 fft = dt * np.fft.rfft(signal, axis=1) 

100 psd = fft.real if even else fft 

101 

102 freqs = np.fft.rfftfreq(signal.shape[1], dt) 

103 freqs = 2 * np.pi * freqs 

104 

105 return freqs, psd 

106 

107 

108def psd_from_time_signal( 

109 x: NDArray[float], 

110 dt: Optional[float] = 1, 

111 complex: Optional[bool] = False, 

112) -> NDArray[float]: 

113 """Computes the power spectral density (PSD) directly from a time signal. 

114 

115 Let x(t) be a time signal. The power spectral density (PSD) can be 

116 computed directly from the Fourier transform of the signal: 

117 

118 ``S(ω) = |F[x(t)]|²`` 

119 

120 where F denotes the Fourier transform. 

121 

122 This function computes the PSD by performing a discrete Fourier 

123 transform of the time signal and taking the squared magnitude of 

124 the transform. 

125 

126 This is equivalent to computing the PSD from the Fourier transform of the 

127 auto-correlation function. 

128 

129 Parameters 

130 ---------- 

131 x 

132 Time signal as a 1D array. 

133 dt 

134 Time spacing between consecutive samples. 

135 complex 

136 Whether the time signal is complex. If False (default), the signal is 

137 assumed to be real and the PSD is computed using a real FFT (`rfft`), 

138 returning only non-negative frequencies. If True, the full FFT (`fft`) 

139 is used and both positive and negative frequencies are returned. 

140 

141 Returns 

142 ------- 

143 freqs, psd 

144 Angular frequencies ω and corresponding PSD values. 

145 """ 

146 assert x.ndim == 1 

147 

148 signal = np.asarray(x) 

149 N = len(signal) 

150 

151 if complex: 

152 fft = np.fft.fft(signal) 

153 freqs = np.fft.fftfreq(N, dt) 

154 else: 

155 fft = np.fft.rfft(signal) 

156 freqs = np.fft.rfftfreq(N, dt) 

157 

158 psd = dt * np.abs(fft) ** 2 / N 

159 freqs = 2 * np.pi * freqs 

160 

161 return freqs, psd 

162 

163 

164def compute_acf( 

165 Z: NDArray[float], 

166 delta_t: Optional[float] = 1.0, 

167 method: Optional[str] = 'scipy', 

168) -> NDArray[float]: 

169 r""" 

170 Computes the autocorrelation function (ACF) for a one-dimensional signal :math:`Z` in time as 

171 

172 .. math:: 

173 

174 \text{ACF}(\tau) = \frac{\left < Z(t) Z^*(t+\tau) \right >}{\left < Z(t) Z^*(t) \right >} 

175 

176 Here, only the real part of the ACF is returned since if :math:`Z` is complex 

177 the imaginary part should average out to zero for any stationary signal. 

178 

179 Parameters 

180 ---------- 

181 Z 

182 Complex time signal. 

183 delta_t 

184 Spacing in time between two consecutive values in :math:`Z`. 

185 method 

186 Implementation to use; possible values: `numpy` and `scipy` (default and usually faster). 

187 """ 

188 

189 # keep only real part and normalize 

190 acf = _compute_correlation_function(Z, Z, method) 

191 acf = np.real(acf) 

192 acf /= acf[0] 

193 

194 time_lags = delta_t * np.arange(0, len(acf), 1) 

195 return time_lags, acf 

196 

197 

198def _compute_correlation_function(Z1, Z2, method: Optional[str] = 'scipy'): 

199 N = len(Z1) 

200 assert len(Z1) == len(Z2) 

201 if method == 'scipy': 

202 cf = correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1) 

203 elif method == 'numpy': 

204 cf = np.correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1) 

205 else: 

206 raise ValueError('method must be either numpy or scipy') 

207 return cf 

208 

209 

210# smoothing functions / FFT filters 

211# ------------------------------------- 

212def gaussian_decay(t: NDArray[float], t_sigma: float) -> NDArray[float]: 

213 r""" 

214 Evaluates a gaussian distribution in time :math:`f(t)`, which can be applied to an ACF in time 

215 to artificially damp it, i.e., forcing it to go to zero for long times. 

216 

217 .. math:: 

218 

219 f(t) = \exp{\left [-\frac{1}{2} \left (\frac{t}{t_\mathrm{sigma}}\right )^2 \right ] } 

220 

221 Parameters 

222 ---------- 

223 t 

224 Time array. 

225 t_sigma 

226 Width (standard deviation of the gaussian) of the decay. 

227 """ 

228 

229 return np.exp(- 1 / 2 * (t / t_sigma) ** 2) 

230 

231 

232def fermi_dirac(t: NDArray[float], t_0: float, t_width: float) -> NDArray[float]: 

233 r""" 

234 Evaluates a Fermi-Dirac-like function in time :math:`f(t)`, which can be applied to an 

235 auto-correlation function (ACF) in time to artificially dampen it, i.e., forcing it to 

236 go to zero for long times without affecting the short-time correlations too much. 

237 

238 .. math:: 

239 

240 f(t) = \frac{1}{\exp{[(t-t_0)/t_\mathrm{width}}] + 1} 

241 

242 Parameters 

243 ---------- 

244 t 

245 Time array. 

246 t_0 

247 Starting time for decay. 

248 t_width 

249 Width of the decay. 

250 

251 """ 

252 return 1.0 / (np.exp((t - t_0) / t_width) + 1) 

253 

254 

255def smoothing_function( 

256 data: NDArray[float], 

257 window_size: int, 

258 window_type: Optional[str] = 'hamming', 

259) -> NDArray[float]: 

260 """ 

261 Smoothing function for 1D arrays. 

262 This functions employs the pandas rolling window average function. 

263 

264 Parameters 

265 ---------- 

266 data 

267 1D data array. 

268 window_size 

269 The size of smoothing/smearing window. 

270 window_type 

271 What type of window-shape to use, e.g. ``'blackman'``, ``'hamming'``, ``'boxcar'`` 

272 (see pandas and scipy documentaiton for more details). 

273 

274 """ 

275 series = pd.Series(data) 

276 new_data = series.rolling(window_size, win_type=window_type, center=True, min_periods=1).mean() 

277 return np.array(new_data)