Coverage for dynasor / post_processing / spherical_average.py: 93%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 12:31 +0000

1from copy import deepcopy 

2from typing import Optional 

3import numpy as np 

4from numpy.typing import NDArray 

5from scipy.stats import norm 

6from dynasor.logging_tools import logger 

7from dynasor.sample import Sample 

8 

9 

10def get_spherically_averaged_sample_smearing( 

11 sample: Sample, 

12 q_norms: NDArray[float], 

13 q_width: float, 

14 use_sum: Optional[bool] = False, 

15 broadening: Optional[str] = 'gaussian', 

16) -> Sample: 

17 r""" 

18 Compute a spherical average over q-points for all the correlation functions in :attr:`sample`. 

19 

20 Each q-point contributes to the function value at a given :math:`\boldsymbol{q}` with a weight 

21 determined by a broadening function. For example 

22 

23 .. math:: 

24 

25 F(q) = \sum_i w(\boldsymbol{q}_i, q) F(\boldsymbol{q}_i) 

26 

27 where :math:`\sum_i w(\boldsymbol{q}_i, q) = 1` (when :attr:`use_sum` is ``False``). 

28 

29 Two broadening functions are available via :attr:`broadening`: 

30 

31 **Gaussian** (``'gaussian'``): 

32 

33 .. math:: 

34 

35 w(\boldsymbol{q}_i, q) \propto \exp{\left [ -\frac{1}{2} \left ( \frac{|\boldsymbol{q}_i| 

36 - q}{q_{width}} \right)^2 \right ]} 

37 

38 where :attr:`q_width` is the standard deviation :math:`\sigma`. 

39 

40 **Lorentzian** (``'lorentzian'``): 

41 

42 .. math:: 

43 

44 w(\boldsymbol{q}_i, q) \propto \frac{1}{\left(|\boldsymbol{q}_i| - q\right)^2 

45 + q_{width}^2} 

46 

47 where :attr:`q_width` is the half-width at half-maximum :math:`\gamma`. 

48 

49 Parameters 

50 ---------- 

51 sample 

52 Input sample. 

53 q_norms 

54 Values of :math:`|\vec{q}|` at which to evaluate the correlation functions. 

55 q_width 

56 Width of the broadening function. Standard deviation :math:`\sigma` for Gaussian; 

57 half-width at half-maximum :math:`\gamma` for Lorentzian. 

58 use_sum 

59 Whether to average or sum the sample in each bin. 

60 broadening 

61 Broadening function to use. Either ``'gaussian'`` (default) or ``'lorentzian'``. 

62 """ 

63 if not isinstance(sample, Sample): 

64 raise ValueError('Input sample is not a Sample object.') 

65 

66 if broadening not in ('gaussian', 'lorentzian'): 

67 raise ValueError(f"broadening must be 'gaussian' or 'lorentzian', got '{broadening}'") 

68 

69 # get q-points 

70 q_points = sample.q_points 

71 if q_points.shape[1] != 3: 

72 raise ValueError('q-points array has the wrong shape.') 

73 

74 # set up new input dicts for new Sample, remove q_points, add q_norms 

75 data_dict = dict() 

76 for key in sample.dimensions: 

77 if key == 'q_points': 

78 continue 

79 data_dict[key] = sample[key] 

80 

81 if broadening == 'gaussian': 

82 get_average = _get_gaussian_average 

83 get_sum = _get_gaussian_sum 

84 else: 

85 get_average = _get_lorentzian_average 

86 get_sum = _get_lorentzian_sum 

87 

88 for key in sample.available_correlation_functions: 

89 Z = getattr(sample, key) 

90 if use_sum: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 averaged_data = get_sum(q_points, Z, q_norms, q_width) 

92 else: 

93 averaged_data = get_average(q_points, Z, q_norms, q_width) 

94 data_dict[key] = averaged_data 

95 data_dict['q_norms'] = q_norms 

96 

97 # compose new object 

98 new_sample = sample.__class__( 

99 data_dict, 

100 simulation_data=deepcopy(sample.simulation_data), 

101 history=deepcopy(sample.history)) 

102 new_sample._append_history( 

103 'get_spherically_averaged_sample_smearing', 

104 dict( 

105 q_width=q_width, 

106 use_sum=use_sum, 

107 broadening=broadening, 

108 )) 

109 

110 return new_sample 

111 

112 

113def get_spherically_averaged_sample_binned( 

114 sample: Sample, 

115 num_q_bins: int, 

116 use_sum: Optional[bool] = False, 

117) -> Sample: 

118 r""" 

119 Compute a spherical average over q-points for all the correlation functions in :attr:`sample`. 

120 

121 Here, a q-binning method is used to conduct the spherical average, meaning all q-points are 

122 placed into spherical bins (shells). 

123 The corresponding function is calculated as the average of all q-points in a bin. 

124 If a q-bin does not contain any q-points, then its value is set to `np.nan`. 

125 The boundaries of the range, `q_min` and `q_max`, are taken as the minimum and maximum, 

126 respectively, of `|q_points|`. 

127 These will be set as bin centers for the first and last bins, respectively. 

128 The input parameter is the number of q-bins to use :attr:`num_q_bins`. 

129 

130 Parameters 

131 ---------- 

132 sample 

133 Input sample. 

134 num_q_bins 

135 Number of q-bins to use. 

136 use_sum 

137 Whether to average or sum the sample in each bin. 

138 """ 

139 

140 if not isinstance(sample, Sample): 

141 raise ValueError('Input sample is not a Sample object.') 

142 

143 # get q-points 

144 q_points = sample.q_points 

145 if q_points.shape[1] != 3: 

146 raise ValueError('q-points array has wrong shape.') 

147 

148 # set up new input dicts for new Sample, remove q_points, add q_norms 

149 data_dict = dict() 

150 for key in sample.dimensions: 

151 if key == 'q_points': 

152 continue 

153 data_dict[key] = sample[key] 

154 

155 # compute spherical average for each correlation function 

156 for key in sample.available_correlation_functions: 

157 Z = getattr(sample, key) 

158 q_bincenters, bin_counts, averaged_data = _get_bin_average(q_points, Z, num_q_bins, use_sum) 

159 data_dict[key] = averaged_data 

160 data_dict['q_norms'] = q_bincenters 

161 

162 # compose new sample 

163 new_sample = sample.__class__( 

164 data_dict, 

165 simulation_data=deepcopy(sample.simulation_data), 

166 history=deepcopy(sample.history)) 

167 new_sample._append_history( 

168 'get_spherically_averaged_sample_binned', 

169 dict( 

170 num_q_bins=num_q_bins, 

171 use_sum=use_sum, 

172 )) 

173 

174 return new_sample 

175 

176 

177def _get_gaussian_average( 

178 q_points: NDArray[float], 

179 Z: NDArray[float], 

180 q_norms: NDArray[float], 

181 q_width: float, 

182) -> NDArray[float]: 

183 q_norms_sample = np.linalg.norm(q_points, axis=1) 

184 # weights shape: (N_q_out, N_qpoints); normalization constant cancels so omit it 

185 diff = q_norms[:, None] - q_norms_sample[None, :] 

186 weights = np.exp(-0.5 * (diff / q_width) ** 2) 

187 norms = weights.sum(axis=1, keepdims=True) 

188 weights /= np.where(norms != 0, norms, 1.0) 

189 return weights @ Z 

190 

191 

192def _get_gaussian_sum( 

193 q_points: NDArray[float], 

194 Z: NDArray[float], 

195 q_norms: NDArray[float], 

196 q_width: float, 

197) -> NDArray[float]: 

198 q_norms_sample = np.linalg.norm(q_points, axis=1) 

199 diff = q_norms[:, None] - q_norms_sample[None, :] 

200 weights = np.exp(-0.5 * (diff / q_width) ** 2) / (q_width * np.sqrt(2 * np.pi)) 

201 return weights @ Z 

202 

203 

204def _gaussian(x: NDArray[float], x0: float, sigma: float) -> NDArray[float]: 

205 dist = norm(loc=x0, scale=sigma) 

206 return dist.pdf(x) 

207 

208 

209def _get_lorentzian_average( 

210 q_points: NDArray[float], 

211 Z: NDArray[float], 

212 q_norms: NDArray[float], 

213 q_width: float, 

214) -> NDArray[float]: 

215 q_norms_sample = np.linalg.norm(q_points, axis=1) 

216 # weights shape: (N_q_out, N_qpoints); normalization constant cancels so omit it 

217 diff = q_norms[:, None] - q_norms_sample[None, :] 

218 weights = 1.0 / (diff ** 2 + q_width ** 2) 

219 norms = weights.sum(axis=1, keepdims=True) 

220 weights /= np.where(norms != 0, norms, 1.0) 

221 return weights @ Z 

222 

223 

224def _get_lorentzian_sum( 

225 q_points: NDArray[float], 

226 Z: NDArray[float], 

227 q_norms: NDArray[float], 

228 q_width: float, 

229) -> NDArray[float]: 

230 q_norms_sample = np.linalg.norm(q_points, axis=1) 

231 diff = q_norms[:, None] - q_norms_sample[None, :] 

232 weights = q_width / (np.pi * (diff ** 2 + q_width ** 2)) 

233 return weights @ Z 

234 

235 

236def _lorentzian(x: NDArray[float], x0: float, gamma: float) -> NDArray[float]: 

237 return gamma / (np.pi * ((x - x0) ** 2 + gamma ** 2)) 

238 

239 

240def _get_bin_average( 

241 q_points: NDArray[float], 

242 data: NDArray[float], 

243 num_q_bins: int, 

244 use_sum: Optional[bool] = False, 

245) -> tuple[NDArray[float], NDArray[float]]: 

246 """ 

247 Compute a spherical average over q-points for the data using q-bins. 

248 

249 If a q-bin does not contain any q-points, then a np.nan is inserted. 

250 

251 The q_min and q_min are determined from min/max of |q_points|, and will determine the bin-range. 

252 These will set as bin-centers for the first and last bins repsectivley. 

253 

254 Parameters 

255 ---------- 

256 q_points 

257 Array of q-points shape ``(Nq, 3)``. 

258 data 

259 Array of shape ``(Nq, N)``, shape cannot be ``(Nq, )``. 

260 num_q_bins 

261 Number of radial q-point bins to use. 

262 use_sum 

263 Whether or not to sum the data in each bin. 

264 

265 Returns 

266 ------- 

267 Tuple comprising the array of |q| bins of shape ``(num_q_bins, )`` 

268 and the averaged data-array. 

269 """ 

270 N_qpoints = q_points.shape[0] 

271 N_t = data.shape[1] 

272 assert q_points.shape[1] == 3 

273 assert data.shape[0] == N_qpoints 

274 

275 # q-norms 

276 q_norms = np.linalg.norm(q_points, axis=1) 

277 assert q_norms.shape == (N_qpoints,) 

278 

279 # set up bins 

280 q_max = np.max(q_norms) 

281 q_min = np.min(q_norms) 

282 delta_x = (q_max - q_min) / (num_q_bins - 1) 

283 q_range = (q_min - delta_x / 2, q_max + delta_x / 2) 

284 bin_counts, edges = np.histogram(q_norms, bins=num_q_bins, range=q_range) 

285 q_bincenters = 0.5 * (edges[1:] + edges[:-1]) 

286 

287 # calculate average for each bin 

288 averaged_data = np.zeros((num_q_bins, N_t)) 

289 for bin_index in range(num_q_bins): 

290 # find q-indices that belong to this bin 

291 bin_min = edges[bin_index] 

292 bin_max = edges[bin_index + 1] 

293 bin_count = bin_counts[bin_index] 

294 q_indices = np.where(np.logical_and(q_norms >= bin_min, q_norms < bin_max))[0] 

295 assert len(q_indices) == bin_count 

296 logger.debug(f'bin {bin_index} contains {bin_count} q-points') 

297 

298 # average over q-indices, if no indices then np.nan 

299 if bin_count == 0: 

300 logger.warning(f'No q-points for bin {bin_index}') 

301 data_bin = np.array([np.nan for _ in range(N_t)]) 

302 else: 

303 if use_sum: 

304 data_bin = data[q_indices, :].sum(axis=0) 

305 else: 

306 data_bin = data[q_indices, :].mean(axis=0) 

307 averaged_data[bin_index, :] = data_bin 

308 

309 return q_bincenters, bin_counts, averaged_data