Coverage for strongcoca / calculators / polarizability_calculator.py: 100%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-15 18:15 +0000

1import logging 

2 

3from typing import Union 

4import numpy as np 

5from numpy.linalg import solve 

6from .. import CoupledSystem 

7from ..response.utilities import Broadening, GaussianBroadening 

8from ..types import Array 

9from ..units import eV_to_au 

10from .base_calculator import BaseCalculator 

11from .. import env 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class PolarizabilityCalculator(BaseCalculator): 

17 """Instances of this class enable the calculation of correlation energy 

18 and spectrum of a coupled system, all polarizable objects in which have an 

19 internal representation in the form of a polarizability tensor. 

20 

21 Spectra calculated with this calculator derive broadening from 

22 the underlying polarizable units. Any additional or separate broadening 

23 is not added. 

24 

25 Parameters 

26 ---------- 

27 coupled_system 

28 Coupled system for which to carry out calculations. 

29 imaginary_frequencies 

30 Frequency grid along imaginary axis used for correlation energy 

31 calculation; eV by default, optionally atomic units (see :attr:`units`). 

32 units 

33 `eV` to specify energies in eV or `au` to specify inputs in atomic units. 

34 

35 This parameter determines whether conversion should be performed during 

36 initialization and has no effect on instance methods and variables. 

37 name 

38 Name of response. 

39 

40 Examples 

41 -------- 

42 

43 The following code snippet shows the calculation of the correlation energy 

44 for a simple demo system: 

45 

46 >>> import numpy as np 

47 >>> from strongcoca import CoupledSystem, PolarizableUnit 

48 >>> from strongcoca.response import build_random_casida 

49 >>> from strongcoca.calculators import PolarizabilityCalculator 

50 >>> 

51 >>> # construct coupled system 

52 >>> response = build_random_casida(n_states=2) 

53 >>> pu1 = PolarizableUnit(response, [0, 0, 0]) 

54 >>> pu2 = PolarizableUnit(response, [2, 0, 0]) 

55 >>> cs = CoupledSystem([pu1, pu2]) 

56 >>> 

57 >>> # set up calculator 

58 >>> calc = PolarizabilityCalculator(cs, np.linspace(0, 10, 100)) 

59 >>> 

60 >>> # ... and compute the correlation energy 

61 >>> calc.get_correlation_energy() 

62 -0.72084000362... 

63 """ 

64 

65 def __init__(self, 

66 coupled_system: CoupledSystem, 

67 imaginary_frequencies: Array, 

68 units: str = 'eV', 

69 name: str = 'PolarizabilityCalculator') -> None: 

70 super().__init__(coupled_system, broadening=Broadening(), name=name) 

71 

72 imaginary_frequencies = np.asarray(imaginary_frequencies) 

73 if units == 'eV': 

74 imaginary_frequencies = imaginary_frequencies * eV_to_au 

75 elif units != 'au': 

76 raise ValueError(f"units has to be 'eV' or 'au', not '{units}'") 

77 

78 self._ifreq_w = imaginary_frequencies 

79 self._K_nn: Union[np.ndarray, None] = None 

80 # Check frequency grid 

81 difreq_w = self._ifreq_w[1:] - self._ifreq_w[:-1] # type: ignore 

82 self._difreq = difreq_w[0] 

83 if not np.allclose(difreq_w, self._difreq): 

84 raise ValueError('Frequency grid needs to be equally spaced.') 

85 

86 def _build_coupling_matrix(self) -> np.ndarray: 

87 """Builds coupling matrix. 

88 

89 Coupling matrix is formed of blocks: 

90 Each block is a 3x3 dipole-dipole tensor and there is NxN blocks 

91 in total (N: number of units in coupled system). 

92 The 3x3 blocks on the diagonal are zero. 

93 """ 

94 Ni = len(self._coupled_system) 

95 N3 = 3 * Ni 

96 

97 # Collect positions: (Ni, 3) 

98 pos_iv = np.array([pu._position for pu in self._coupled_system]) 

99 

100 # Pairwise displacement vectors R[i,j] = pos[j] - pos[i], shape (Ni, Ni, 3) 

101 R_ijv = pos_iv[np.newaxis, :] - pos_iv[:, np.newaxis] 

102 

103 # Pairwise distances, shape (Ni, Ni). 

104 # Set diagonal to 1.0 to avoid division by zero; those blocks are zeroed later. 

105 r_ij = np.linalg.norm(R_ijv, axis=-1) 

106 np.fill_diagonal(r_ij, 1.0) 

107 

108 # Dipole-dipole tensor for all pairs: T[i,j] = I/r³ - 3 R_ij R_ij^T / r⁵ 

109 # Shape: (Ni, Ni, 3, 3) 

110 T_ijvw = (np.eye(3) / r_ij[:, :, np.newaxis, np.newaxis] ** 3 

111 - 3.0 * (R_ijv[:, :, :, np.newaxis] * R_ijv[:, :, np.newaxis, :]) 

112 / r_ij[:, :, np.newaxis, np.newaxis] ** 5) 

113 

114 # Zero diagonal blocks (self-interaction) 

115 T_ijvw[np.arange(Ni), np.arange(Ni)] = 0.0 

116 

117 # Reshape (Ni, Ni, 3, 3) → (N3, N3): index [i,j,v,w] → [3i+v, 3j+w] 

118 return T_ijvw.transpose(0, 2, 1, 3).reshape(N3, N3).astype(complex) 

119 

120 def _calculate_correlation_energy(self) -> float: 

121 """Returns the correlation energy of the coupled system in atomic units. 

122 """ 

123 

124 Ni = len(self._coupled_system) 

125 if Ni == 0: 

126 return 0.0 

127 K_nn = self.coupling_matrix 

128 W = len(self._ifreq_w) 

129 

130 # Collect per-unit polarizabilities on CPU: calling response objects is 

131 # inherently sequential Python and cannot be moved to GPU. 

132 dm_wNvv = np.empty((W, Ni, 3, 3), dtype=complex) 

133 for i, pu in enumerate(self._coupled_system): 

134 dm_wNvv[:, i] = pu._get_dynamic_polarizability_imaginary_frequency(self._ifreq_w) 

135 

136 gpu_backend = env.strongcoca_getenv('GPU_BACKEND') 

137 if gpu_backend != 'none': # pragma: no cover 

138 # Fused GPU path: matmul → D = I−chi_K → slogdet → trace. 

139 # K_nn is transferred once; chi_K is never moved back to CPU. 

140 # Frequency chunking keeps peak VRAM at O(W_chunk × N²). 

141 from .gpu import energy_trace as _gpu_energy_trace 

142 precision = env.strongcoca_getenv('GPU_PRECISION') 

143 trace_logD_w, trace_chiK_w = _gpu_energy_trace( 

144 dm_wNvv, K_nn, gpu_backend, precision) 

145 else: 

146 # Vectorized CPU path: single batched matmul then slogdet. 

147 K_block = K_nn.reshape(Ni, 3, 3 * Ni) 

148 chi_K_wnn = (dm_wNvv @ K_block).reshape(W, 3 * Ni, 3 * Ni) 

149 D_wnn = np.eye(3 * Ni)[None] - chi_K_wnn 

150 sign_w, logabsdet_w = np.linalg.slogdet(D_wnn) 

151 trace_logD_w = np.log(sign_w) + logabsdet_w 

152 # tr(chi_K†) = conj(tr(chi_K)) 

153 trace_chiK_w = np.diagonal(chi_K_wnn, axis1=-2, axis2=-1).sum(axis=-1) 

154 

155 integrand_w = trace_logD_w + trace_chiK_w.conj() 

156 integral = np.sum(integrand_w) * self._difreq 

157 energy: float = float(np.real(integral)) / (2 * np.pi) 

158 return energy 

159 

160 @property 

161 def coupling_matrix(self) -> np.ndarray: 

162 r""" The coupling matrix 

163 

164 .. math:: 

165 \boldsymbol{T}_{ij} = 

166 \frac{3\boldsymbol{r}^{(ij)}(\boldsymbol{r}^{(ij)})^\text{T}}{\left|r^{(ij)}\right|^5} 

167 - \frac{1}{\left|r^{(ij)}\right|^3} 

168 

169 as a matrix where the rows and columns each correspond to the system index 

170 :math:`i` or :math:`j` *and* the Cartesian direction. 

171 

172 This quantity is calculated on first use and then buffered. 

173 """ 

174 if self._K_nn is None: 

175 self._K_nn = self._build_coupling_matrix() 

176 return self._K_nn 

177 

178 def _get_dynamic_polarizability(self, frequencies: Array) -> np.ndarray: 

179 if any(isinstance(pu.broadening, GaussianBroadening) 

180 for pu in self._coupled_system): 

181 raise NotImplementedError( 

182 'Gaussian broadening is not supported in PolarizabilityCalculator. ' 

183 'See issue #42.') 

184 

185 freq_w = np.array(frequencies) 

186 Ni = len(self._coupled_system) 

187 

188 # Collect per-unit polarizabilities for ALL frequencies at once. 

189 # Calling _get_dynamic_polarizability with all frequencies is as fast as 

190 # calling it with one frequency due to internal vectorization, so collecting 

191 # upfront avoids repeating the call W times (once per chunk in the old code). 

192 dm_wNvv = np.empty((len(freq_w), Ni, 3, 3), dtype=complex) 

193 for i, pu in enumerate(self._coupled_system): 

194 dm_wNvv[:, i] = pu._get_dynamic_polarizability(freq_w) 

195 

196 gpu_backend = env.strongcoca_getenv('GPU_BACKEND') 

197 if gpu_backend != 'none': # pragma: no cover 

198 # GPU path: fused A construction + solve on GPU, with GPU-memory-aware 

199 # frequency chunking. A matrix never transferred to CPU. 

200 from .gpu import spectrum_solve as _gpu_spectrum_solve 

201 precision = env.strongcoca_getenv('GPU_PRECISION') 

202 return _gpu_spectrum_solve(dm_wNvv, self.coupling_matrix, gpu_backend, precision) 

203 

204 # CPU path: chunk over frequencies to keep A matrix within RAM budget. 

205 dm_wvv = np.zeros((len(freq_w), 3, 3), dtype=complex) 

206 to_MiB = 1024 ** -2 

207 mem_limit = env.get_float('MAX_SOLVE_MEM') / to_MiB 

208 syssize = 3 ** 2 * Ni ** 2 * 8 # size of A matrix per frequency in bytes 

209 chunksize = max(1, int(mem_limit) // syssize) 

210 for indices in np.array_split(np.arange(len(freq_w)), 

211 (len(freq_w) + chunksize - 1) // chunksize): 

212 dm_wvv[indices] = self._get_dynamic_polarizability_chunk(dm_wNvv[indices]) 

213 return dm_wvv 

214 

215 def _get_dynamic_polarizability_chunk(self, dm_wNvv: np.ndarray) -> np.ndarray: 

216 """CPU path: build A and solve for a pre-collected frequency chunk. 

217 

218 Parameters 

219 ---------- 

220 dm_wNvv 

221 Per-unit polarizabilities for this chunk, shape ``(W, Ni, 3, 3)``. 

222 """ 

223 Ni = len(self._coupled_system) 

224 K_nn = self.coupling_matrix 

225 K_block = K_nn.reshape(Ni, 3, 3 * Ni) 

226 W = len(dm_wNvv) 

227 N3 = 3 * Ni 

228 

229 # Build A = I + chi_K by looping over units with all W frequencies batched. 

230 # This makes Ni BLAS calls of shape (W, 3, 3) @ (3, N3) instead of W*Ni 

231 # calls of shape (1, 3, 3) @ (3, N3), which is significantly faster. 

232 A_wnn = np.tile(np.eye(N3, dtype=complex), (W, 1, 1)) 

233 for i in range(Ni): 

234 n0 = i * 3 

235 A_wnn[:, n0:n0 + 3, :] += dm_wNvv[:, i] @ K_block[i] 

236 

237 rhs_wnv = np.ascontiguousarray(dm_wNvv.reshape(W, N3, 3)) 

238 red_wnv = solve(A_wnn, rhs_wnv) 

239 return red_wnv.reshape(W, Ni, 3, 3).sum(axis=1) 

240 

241 def _get_dynamic_polarizability_imaginary_frequency( 

242 self, frequencies: Array) -> np.ndarray: 

243 raise NotImplementedError()