forked from PennyLaneAI/pennylane-sf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgbs.py
322 lines (246 loc) · 11.3 KB
/
gbs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# Copyright 2018-2020 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the variational GBS device.
The Strawberry Fields variational GBS device is a simulator that provides a way to encode
variational parameters into GBS so that the gradient with respect to the output probability
distribution is accessible.
"""
from collections import OrderedDict
import numpy as np
import strawberryfields as sf
from strawberryfields.ops import GraphEmbed, MeasureFock
from strawberryfields.utils import all_fock_probs_pnr
from thewalrus.quantum import adj_scaling as rescale
from thewalrus.quantum import photon_number_mean_vector
import pennylane as qml
from pennylane.wires import Wires
from .expectations import identity
from .simulator import StrawberryFieldsSimulator
# pylint: disable=too-many-instance-attributes
class StrawberryFieldsGBS(StrawberryFieldsSimulator):
r"""StrawberryFields variational GBS device for PennyLane.
This device provides a simulator that can embed variational parameters into GBS such that the
analytic gradient of the probability distribution is accessible.
For more details see :doc:`/devices/gbs`.
Args:
wires (int or Iterable[Number, str]]): the number of modes to initialize the device in
shots (int): Number of circuit evaluations/random samples used
to estimate expectation values of observables. If ``shots=None``,
circuit evaluations are computed analytically.
cutoff_dim (int): Fock-space truncation dimension
use_cache (bool): indicates whether to use samples from previous evaluations to speed up
calculation of the probability distribution. If ``shots=None``, this setting is
ignored.
samples (array): pre-generated samples using the input adjacency matrix specified by
:class:`ParamGraphEmbed`. In non-analytic mode with ``use_cache=True``, probabilities
will be inferred from these samples rather than generating new samples, resulting in
faster evaluation of the circuit and its derivative.
"""
name = "Strawberry Fields variational GBS PennyLane plugin"
short_name = "strawberryfields.gbs"
_operation_map = {
"ParamGraphEmbed": GraphEmbed,
}
_observable_map = {
"Identity": identity,
}
_circuits = {}
_capabilities = {"model": "cv", "provides_jacobian": True}
def __init__(self, wires, *, shots=None, cutoff_dim, use_cache=False, samples=None):
super().__init__(wires, shots=shots)
self.cutoff = cutoff_dim
self._use_cache = use_cache
self.samples = samples
self._params = None
self._WAW = None
self.Z_inv = None
@staticmethod
def calculate_WAW(params, A):
"""Calculates the :math:`WAW` matrix.
Args:
params (array[float]): variable parameters
A (array[float]): adjacency matrix
Returns:
array[float]: the :math:`WAW` matrix
"""
W = np.diag(np.sqrt(params))
return W @ A @ W
@staticmethod
def calculate_n_mean(A):
"""Calculates the mean number of photons for an adjacency matrix encoded into GBS.
.. warning::
Note that for ``A`` to be directly encoded into GBS, its singular values must
be less than one.
Args:
A (array[float]): adjacency matrix
Returns:
float: mean number of photons
"""
singular_values = np.linalg.svd(A, compute_uv=False)
if not all(singular_values < 1):
raise ValueError("Singular values of matrix A must be less than 1")
return np.sum(singular_values**2 / (1 - singular_values**2))
# pylint: disable=missing-function-docstring
def execute(self, queue, observables, parameters=None, **kwargs):
parameters = parameters or {}
if len(queue) > 1:
raise ValueError(
"The StrawberryFieldsGBS device accepts only a single application of ParamGraphEmbed"
)
return super().execute(queue, observables, parameters, **kwargs)
# pylint: disable=pointless-statement,expression-not-assigned
def apply(self, operation, wires, par):
self._params, A, n_mean = par
A *= rescale(A, n_mean)
self.Z_inv = self._calculate_z_inv(A)
if len(self._params) != self.num_wires:
raise ValueError(
"The number of variable parameters must be equal to the total number of wires."
)
self._WAW = self.calculate_WAW(self._params, A)
if self.shots is not None and self._use_cache:
op = GraphEmbed(A, mean_photon_per_mode=n_mean / len(A))
else:
n_mean_WAW = self.calculate_n_mean(self._WAW)
op = GraphEmbed(self._WAW, mean_photon_per_mode=n_mean_WAW / len(A))
op | [self.q[wires.index(i)] for i in wires]
if self.shots is not None:
MeasureFock() | [self.q[wires.index(i)] for i in wires]
def reset(self):
if self.shots is not None and self._use_cache and self.samples is not None:
# In this case, we do not reset because we want to keep our samples
return
super().reset()
def pre_measure(self):
self.eng = sf.Engine("gaussian", backend_options={"cutoff_dim": self.cutoff})
if self.shots is None:
results = self.eng.run(self.prog)
elif self._use_cache and self.samples is not None: # use the cached samples
return
else:
results = self.eng.run(self.prog, shots=self.shots)
self.state = results.state
self.samples = results.samples
def _reparametrize_probability(self, p):
"""Takes an input probability distribution of :math:`A` and rescales it to the probability
distribution of :math:`WAW`.
Args:
p (array): the probability distribution of :math:`A`
Returns:
array: the probability distribution of :math:`WAW`
"""
Z_inv = self._calculate_z_inv(self._WAW)
ind_all_wires = np.ndindex(*[self.cutoff] * self.num_wires)
for s in ind_all_wires:
res = np.prod(np.power(self._params, s))
p[tuple(s)] *= res * Z_inv / self.Z_inv
return p
def _marginal_over_wires(self, wires, array):
"""Calculates the marginal distribution over the specified wires by summing over the
remaining.
The flat input array is reshaped to have ``self.num_wires + 1`` axes.
Args:
wires (int or Iterable[Number, str]]): the wires to find marginals over
array (array): the input array
Returns:
array: the traced-over array
"""
trailing = int(array.size / self.cutoff**self.num_wires)
array = array.reshape([self.cutoff] * self.num_wires + [trailing])
all_indices = set(range(self.num_wires))
requested_indices = set(self.wires.indices(wires))
trace_over_indices = all_indices - requested_indices # Find indices to trace over
return np.sum(array, axis=tuple(trace_over_indices)).ravel()
def probability(self, wires=None):
wires = wires or self.wires
wires = Wires(wires)
if self.shots is None:
return super().probability(wires=wires)
samples = np.take(self.samples, self.wires.indices(wires), axis=1)
fock_probs = all_fock_probs_pnr(samples)
cutoff = fock_probs.shape[0]
diff = max(self.cutoff - cutoff, 0)
probs = np.pad(fock_probs, [(0, diff)] * len(wires))
if self._use_cache:
if len(wires) < self.num_wires:
raise ValueError(
"Caching is only supported when returning the probabilities on "
"all of the wires"
)
probs = self._reparametrize_probability(probs)
ind = np.ndindex(*[self.cutoff] * len(wires))
probs = OrderedDict((tuple(i), probs[tuple(i)]) for i in ind)
return probs
def jacobian(self, tape):
"""Calculates the Jacobian of the device.
Args:
tape (.QuantumTape): the tape to compute the Jacobian of.
Returns:
array[float]: Jacobian matrix of size (``len(probs)``, ``num_wires``)
"""
self.reset()
operations, observables = (tape.operations, tape.observables)
requested_wires = observables[0].wires
# Create dummy observable to measure probabilities over all wires
obs_all_wires = qml.probs(wires=self.wires)
prob = self.execute(operations, [obs_all_wires])[0]
jac = self._jacobian_all_wires(prob)
if requested_wires == self.wires:
return jac
return self._marginal_over_wires(requested_wires, jac).reshape(-1, self.num_wires)
def _jacobian_all_wires(self, prob):
"""Calculates the jacobian of the probability distribution with respect to all wires.
This function uses Eq. (28) of `this <https://arxiv.org/pdf/2004.04770.pdf>`__ paper.
Args:
prob (array[float]): the probability distribution as a flat array
Returns:
array[float]: the jacobian
"""
n = len(self._WAW)
disp = np.zeros(2 * n)
cov = self.calculate_covariance(self._WAW, hbar=self.hbar)
mean_photons_by_mode = photon_number_mean_vector(disp, cov, hbar=self.hbar)
basis_states = np.array(list(np.ndindex(*[self.cutoff] * self.num_wires)))
jac = (basis_states - mean_photons_by_mode) * np.expand_dims(prob, 1) / self._params
return jac
@staticmethod
def calculate_covariance(A, hbar):
r"""Calculates the covariance matrix corresponding to an input adjacency matrix.
Args:
A (array[float]): adjacency matrix
hbar (float): the convention chosen in the canonical commutation relation
:math:`[x, p] = i \hbar`
Returns:
array[float]: covariance matrix
"""
n = len(A)
I = np.identity(2 * n)
o_mat = np.block([[np.zeros_like(A), np.conj(A)], [A, np.zeros_like(A)]])
cov = hbar * (np.linalg.inv(I - o_mat) - I / 2)
return cov
@staticmethod
def _calculate_z_inv(A):
"""Calculates the 1/Z normalization coefficient from GBS.
Args:
A (array): adjacency matrix
Returns:
float: the coefficient
"""
n = len(A)
I = np.identity(2 * n)
o_mat = np.block([[np.zeros_like(A), np.conj(A)], [A, np.zeros_like(A)]])
return np.sqrt(np.linalg.det(I - o_mat))
@property
def use_cache(self):
"""Indicates whether to use samples from previous evaluations to speed up calculation of
the probability distribution. If ``shots=None``, this setting is ignored."""
return self._use_cache