Coverage for dynasor / tools / acfs.py: 67%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 12:31 +0000

1""" 

2A number of utility functions, for example for dealing with 

3autocorrelation functions, Fourier transforms, and smoothing. 

4""" 

5 

6from typing import Optional 

7import numpy as np 

8from scipy.signal import correlate 

9from numpy.typing import NDArray 

10import pandas as pd 

11 

12 

13def psd_from_acf( 

14 acf: NDArray[float], 

15 dt: Optional[float] = 1, 

16 angular: Optional[bool] = True, 

17 even: Optional[bool] = True, 

18) -> NDArray[float]: 

19 """Computes the power spectral density (PSD) from an auto-correlation function (ACF). 

20 

21 Parameters 

22 ---------- 

23 acf 

24 The ACF as an array. 

25 dt 

26 The time step between samples. 

27 angular 

28 Whether to return normal or angular frequencies. 

29 even 

30 Whether to mirror the ACF and force the PSD to be purely real. 

31 """ 

32 assert acf.ndim == 1 

33 signal = np.array(acf) 

34 if even: 

35 signal = np.hstack((signal, signal[:0:-1])) 

36 fft = np.fft.fft(signal) 

37 if even: 

38 assert np.allclose(fft.imag, 0) 

39 fft = fft.real if even else fft 

40 

41 freqs = np.fft.fftfreq(len(signal), dt) 

42 freqs = 2*np.pi * freqs if angular else freqs 

43 

44 return freqs, fft 

45 

46 

47def compute_acf( 

48 Z: NDArray[float], 

49 delta_t: Optional[float] = 1.0, 

50 method: Optional[str] = 'scipy', 

51) -> NDArray[float]: 

52 r""" 

53 Computes the autocorrelation function (ACF) for a one-dimensional signal :math:`Z` in time as 

54 

55 .. math:: 

56 

57 \text{ACF}(\tau) = \frac{\left < Z(t) Z^*(t+\tau) \right >}{\left < Z(t) Z^*(t) \right >} 

58 

59 Here, only the real part of the ACF is returned since if :math:`Z` is complex 

60 the imaginary part should average out to zero for any stationary signal. 

61 

62 Parameters 

63 ---------- 

64 Z 

65 Complex time signal. 

66 delta_t 

67 Spacing in time between two consecutive values in :math:`Z`. 

68 method 

69 Implementation to use; possible values: `numpy` and `scipy` (default and usually faster). 

70 """ 

71 

72 # keep only real part and normalize 

73 acf = _compute_correlation_function(Z, Z, method) 

74 acf = np.real(acf) 

75 acf /= acf[0] 

76 

77 time_lags = delta_t * np.arange(0, len(acf), 1) 

78 return time_lags, acf 

79 

80 

81def _compute_correlation_function(Z1, Z2, method: Optional[str] = 'scipy'): 

82 N = len(Z1) 

83 assert len(Z1) == len(Z2) 

84 if method == 'scipy': 

85 cf = correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1) 

86 elif method == 'numpy': 

87 cf = np.correlate(Z1, Z2, mode='full')[N - 1:] / np.arange(N, 0, -1) 

88 else: 

89 raise ValueError('method must be either numpy or scipy') 

90 return cf 

91 

92 

93# smoothing functions / FFT filters 

94# ------------------------------------- 

95def gaussian_decay(t: NDArray[float], t_sigma: float) -> NDArray[float]: 

96 r""" 

97 Evaluates a gaussian distribution in time :math:`f(t)`, which can be applied to an ACF in time 

98 to artificially damp it, i.e., forcing it to go to zero for long times. 

99 

100 .. math:: 

101 

102 f(t) = \exp{\left [-\frac{1}{2} \left (\frac{t}{t_\mathrm{sigma}}\right )^2 \right ] } 

103 

104 Parameters 

105 ---------- 

106 t 

107 Time array. 

108 t_sigma 

109 Width (standard deviation of the gaussian) of the decay. 

110 """ 

111 

112 return np.exp(- 1 / 2 * (t / t_sigma) ** 2) 

113 

114 

115def fermi_dirac(t: NDArray[float], t_0: float, t_width: float) -> NDArray[float]: 

116 r""" 

117 Evaluates a Fermi-Dirac-like function in time :math:`f(t)`, which can be applied to an 

118 auto-correlation function (ACF) in time to artificially dampen it, i.e., forcing it to 

119 go to zero for long times without affecting the short-time correlations too much. 

120 

121 .. math:: 

122 

123 f(t) = \frac{1}{\exp{[(t-t_0)/t_\mathrm{width}}] + 1} 

124 

125 Parameters 

126 ---------- 

127 t 

128 Time array. 

129 t_0 

130 Starting time for decay. 

131 t_width 

132 Width of the decay. 

133 

134 """ 

135 return 1.0 / (np.exp((t - t_0) / t_width) + 1) 

136 

137 

138def smoothing_function( 

139 data: NDArray[float], 

140 window_size: int, 

141 window_type: Optional[str] = 'hamming', 

142) -> NDArray[float]: 

143 """ 

144 Smoothing function for 1D arrays. 

145 This functions employs the pandas rolling window average function. 

146 

147 Parameters 

148 ---------- 

149 data 

150 1D data array. 

151 window_size 

152 The size of smoothing/smearing window. 

153 window_type 

154 What type of window-shape to use, e.g. ``'blackman'``, ``'hamming'``, ``'boxcar'`` 

155 (see pandas and scipy documentaiton for more details). 

156 

157 """ 

158 series = pd.Series(data) 

159 new_data = series.rolling(window_size, win_type=window_type, center=True, min_periods=1).mean() 

160 return np.array(new_data)