Coverage for strongcoca / calculators / polarizability_calculator.py: 100%
90 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-15 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-15 18:15 +0000
1import logging
3from typing import Union
4import numpy as np
5from numpy.linalg import solve
6from .. import CoupledSystem
7from ..response.utilities import Broadening, GaussianBroadening
8from ..types import Array
9from ..units import eV_to_au
10from .base_calculator import BaseCalculator
11from .. import env
13logger = logging.getLogger(__name__)
16class PolarizabilityCalculator(BaseCalculator):
17 """Instances of this class enable the calculation of correlation energy
18 and spectrum of a coupled system, all polarizable objects in which have an
19 internal representation in the form of a polarizability tensor.
21 Spectra calculated with this calculator derive broadening from
22 the underlying polarizable units. Any additional or separate broadening
23 is not added.
25 Parameters
26 ----------
27 coupled_system
28 Coupled system for which to carry out calculations.
29 imaginary_frequencies
30 Frequency grid along imaginary axis used for correlation energy
31 calculation; eV by default, optionally atomic units (see :attr:`units`).
32 units
33 `eV` to specify energies in eV or `au` to specify inputs in atomic units.
35 This parameter determines whether conversion should be performed during
36 initialization and has no effect on instance methods and variables.
37 name
38 Name of response.
40 Examples
41 --------
43 The following code snippet shows the calculation of the correlation energy
44 for a simple demo system:
46 >>> import numpy as np
47 >>> from strongcoca import CoupledSystem, PolarizableUnit
48 >>> from strongcoca.response import build_random_casida
49 >>> from strongcoca.calculators import PolarizabilityCalculator
50 >>>
51 >>> # construct coupled system
52 >>> response = build_random_casida(n_states=2)
53 >>> pu1 = PolarizableUnit(response, [0, 0, 0])
54 >>> pu2 = PolarizableUnit(response, [2, 0, 0])
55 >>> cs = CoupledSystem([pu1, pu2])
56 >>>
57 >>> # set up calculator
58 >>> calc = PolarizabilityCalculator(cs, np.linspace(0, 10, 100))
59 >>>
60 >>> # ... and compute the correlation energy
61 >>> calc.get_correlation_energy()
62 -0.72084000362...
63 """
65 def __init__(self,
66 coupled_system: CoupledSystem,
67 imaginary_frequencies: Array,
68 units: str = 'eV',
69 name: str = 'PolarizabilityCalculator') -> None:
70 super().__init__(coupled_system, broadening=Broadening(), name=name)
72 imaginary_frequencies = np.asarray(imaginary_frequencies)
73 if units == 'eV':
74 imaginary_frequencies = imaginary_frequencies * eV_to_au
75 elif units != 'au':
76 raise ValueError(f"units has to be 'eV' or 'au', not '{units}'")
78 self._ifreq_w = imaginary_frequencies
79 self._K_nn: Union[np.ndarray, None] = None
80 # Check frequency grid
81 difreq_w = self._ifreq_w[1:] - self._ifreq_w[:-1] # type: ignore
82 self._difreq = difreq_w[0]
83 if not np.allclose(difreq_w, self._difreq):
84 raise ValueError('Frequency grid needs to be equally spaced.')
86 def _build_coupling_matrix(self) -> np.ndarray:
87 """Builds coupling matrix.
89 Coupling matrix is formed of blocks:
90 Each block is a 3x3 dipole-dipole tensor and there is NxN blocks
91 in total (N: number of units in coupled system).
92 The 3x3 blocks on the diagonal are zero.
93 """
94 Ni = len(self._coupled_system)
95 N3 = 3 * Ni
97 # Collect positions: (Ni, 3)
98 pos_iv = np.array([pu._position for pu in self._coupled_system])
100 # Pairwise displacement vectors R[i,j] = pos[j] - pos[i], shape (Ni, Ni, 3)
101 R_ijv = pos_iv[np.newaxis, :] - pos_iv[:, np.newaxis]
103 # Pairwise distances, shape (Ni, Ni).
104 # Set diagonal to 1.0 to avoid division by zero; those blocks are zeroed later.
105 r_ij = np.linalg.norm(R_ijv, axis=-1)
106 np.fill_diagonal(r_ij, 1.0)
108 # Dipole-dipole tensor for all pairs: T[i,j] = I/r³ - 3 R_ij R_ij^T / r⁵
109 # Shape: (Ni, Ni, 3, 3)
110 T_ijvw = (np.eye(3) / r_ij[:, :, np.newaxis, np.newaxis] ** 3
111 - 3.0 * (R_ijv[:, :, :, np.newaxis] * R_ijv[:, :, np.newaxis, :])
112 / r_ij[:, :, np.newaxis, np.newaxis] ** 5)
114 # Zero diagonal blocks (self-interaction)
115 T_ijvw[np.arange(Ni), np.arange(Ni)] = 0.0
117 # Reshape (Ni, Ni, 3, 3) → (N3, N3): index [i,j,v,w] → [3i+v, 3j+w]
118 return T_ijvw.transpose(0, 2, 1, 3).reshape(N3, N3).astype(complex)
120 def _calculate_correlation_energy(self) -> float:
121 """Returns the correlation energy of the coupled system in atomic units.
122 """
124 Ni = len(self._coupled_system)
125 if Ni == 0:
126 return 0.0
127 K_nn = self.coupling_matrix
128 W = len(self._ifreq_w)
130 # Collect per-unit polarizabilities on CPU: calling response objects is
131 # inherently sequential Python and cannot be moved to GPU.
132 dm_wNvv = np.empty((W, Ni, 3, 3), dtype=complex)
133 for i, pu in enumerate(self._coupled_system):
134 dm_wNvv[:, i] = pu._get_dynamic_polarizability_imaginary_frequency(self._ifreq_w)
136 gpu_backend = env.strongcoca_getenv('GPU_BACKEND')
137 if gpu_backend != 'none': # pragma: no cover
138 # Fused GPU path: matmul → D = I−chi_K → slogdet → trace.
139 # K_nn is transferred once; chi_K is never moved back to CPU.
140 # Frequency chunking keeps peak VRAM at O(W_chunk × N²).
141 from .gpu import energy_trace as _gpu_energy_trace
142 precision = env.strongcoca_getenv('GPU_PRECISION')
143 trace_logD_w, trace_chiK_w = _gpu_energy_trace(
144 dm_wNvv, K_nn, gpu_backend, precision)
145 else:
146 # Vectorized CPU path: single batched matmul then slogdet.
147 K_block = K_nn.reshape(Ni, 3, 3 * Ni)
148 chi_K_wnn = (dm_wNvv @ K_block).reshape(W, 3 * Ni, 3 * Ni)
149 D_wnn = np.eye(3 * Ni)[None] - chi_K_wnn
150 sign_w, logabsdet_w = np.linalg.slogdet(D_wnn)
151 trace_logD_w = np.log(sign_w) + logabsdet_w
152 # tr(chi_K†) = conj(tr(chi_K))
153 trace_chiK_w = np.diagonal(chi_K_wnn, axis1=-2, axis2=-1).sum(axis=-1)
155 integrand_w = trace_logD_w + trace_chiK_w.conj()
156 integral = np.sum(integrand_w) * self._difreq
157 energy: float = float(np.real(integral)) / (2 * np.pi)
158 return energy
160 @property
161 def coupling_matrix(self) -> np.ndarray:
162 r""" The coupling matrix
164 .. math::
165 \boldsymbol{T}_{ij} =
166 \frac{3\boldsymbol{r}^{(ij)}(\boldsymbol{r}^{(ij)})^\text{T}}{\left|r^{(ij)}\right|^5}
167 - \frac{1}{\left|r^{(ij)}\right|^3}
169 as a matrix where the rows and columns each correspond to the system index
170 :math:`i` or :math:`j` *and* the Cartesian direction.
172 This quantity is calculated on first use and then buffered.
173 """
174 if self._K_nn is None:
175 self._K_nn = self._build_coupling_matrix()
176 return self._K_nn
178 def _get_dynamic_polarizability(self, frequencies: Array) -> np.ndarray:
179 if any(isinstance(pu.broadening, GaussianBroadening)
180 for pu in self._coupled_system):
181 raise NotImplementedError(
182 'Gaussian broadening is not supported in PolarizabilityCalculator. '
183 'See issue #42.')
185 freq_w = np.array(frequencies)
186 Ni = len(self._coupled_system)
188 # Collect per-unit polarizabilities for ALL frequencies at once.
189 # Calling _get_dynamic_polarizability with all frequencies is as fast as
190 # calling it with one frequency due to internal vectorization, so collecting
191 # upfront avoids repeating the call W times (once per chunk in the old code).
192 dm_wNvv = np.empty((len(freq_w), Ni, 3, 3), dtype=complex)
193 for i, pu in enumerate(self._coupled_system):
194 dm_wNvv[:, i] = pu._get_dynamic_polarizability(freq_w)
196 gpu_backend = env.strongcoca_getenv('GPU_BACKEND')
197 if gpu_backend != 'none': # pragma: no cover
198 # GPU path: fused A construction + solve on GPU, with GPU-memory-aware
199 # frequency chunking. A matrix never transferred to CPU.
200 from .gpu import spectrum_solve as _gpu_spectrum_solve
201 precision = env.strongcoca_getenv('GPU_PRECISION')
202 return _gpu_spectrum_solve(dm_wNvv, self.coupling_matrix, gpu_backend, precision)
204 # CPU path: chunk over frequencies to keep A matrix within RAM budget.
205 dm_wvv = np.zeros((len(freq_w), 3, 3), dtype=complex)
206 to_MiB = 1024 ** -2
207 mem_limit = env.get_float('MAX_SOLVE_MEM') / to_MiB
208 syssize = 3 ** 2 * Ni ** 2 * 8 # size of A matrix per frequency in bytes
209 chunksize = max(1, int(mem_limit) // syssize)
210 for indices in np.array_split(np.arange(len(freq_w)),
211 (len(freq_w) + chunksize - 1) // chunksize):
212 dm_wvv[indices] = self._get_dynamic_polarizability_chunk(dm_wNvv[indices])
213 return dm_wvv
215 def _get_dynamic_polarizability_chunk(self, dm_wNvv: np.ndarray) -> np.ndarray:
216 """CPU path: build A and solve for a pre-collected frequency chunk.
218 Parameters
219 ----------
220 dm_wNvv
221 Per-unit polarizabilities for this chunk, shape ``(W, Ni, 3, 3)``.
222 """
223 Ni = len(self._coupled_system)
224 K_nn = self.coupling_matrix
225 K_block = K_nn.reshape(Ni, 3, 3 * Ni)
226 W = len(dm_wNvv)
227 N3 = 3 * Ni
229 # Build A = I + chi_K by looping over units with all W frequencies batched.
230 # This makes Ni BLAS calls of shape (W, 3, 3) @ (3, N3) instead of W*Ni
231 # calls of shape (1, 3, 3) @ (3, N3), which is significantly faster.
232 A_wnn = np.tile(np.eye(N3, dtype=complex), (W, 1, 1))
233 for i in range(Ni):
234 n0 = i * 3
235 A_wnn[:, n0:n0 + 3, :] += dm_wNvv[:, i] @ K_block[i]
237 rhs_wnv = np.ascontiguousarray(dm_wNvv.reshape(W, N3, 3))
238 red_wnv = solve(A_wnn, rhs_wnv)
239 return red_wnv.reshape(W, Ni, 3, 3).sum(axis=1)
241 def _get_dynamic_polarizability_imaginary_frequency(
242 self, frequencies: Array) -> np.ndarray:
243 raise NotImplementedError()