diff --git a/src/qibo/backends/abstract.py b/src/qibo/backends/abstract.py index a4c7685423..73672146e7 100644 --- a/src/qibo/backends/abstract.py +++ b/src/qibo/backends/abstract.py @@ -397,6 +397,7 @@ def expectation_observable_symbolic_from_samples( nqubits: int, constant: float, nshots: int, + **measurements_kwargs, ) -> float: """Compute the expectation value of a general symbolic observable defined by groups of terms that can be diagonalized simultaneously, starting from the samples. @@ -418,6 +419,8 @@ def expectation_observable_symbolic_from_samples( rotated_circuits = [] qubit_maps = [] + # if "readout_mitigation" in measurements_kwargs: + # breakpoint() # loop over the terms that can be diagonalized simultaneously for terms_qubits, terms_observables in zip( diagonal_terms_qubits, diagonal_terms_observables @@ -433,15 +436,18 @@ def expectation_observable_symbolic_from_samples( # prepare the measurement basis and append it to the circuit for qubit, factor in zip(qubits, observable): if factor != "I" and qubit not in measurements: - measurements[qubit] = gates.M( - qubit, basis=getattr(gates, factor) - ) + measurements[qubit] = getattr(gates, factor) # Get the qubits we want to measure for each term - qubit_maps.append(measurements.keys()) + qubits = measurements.keys() + qubit_maps.append(qubits) + measurement_gate = gates.M( + *qubits, basis=list(measurements.values()), **measurements_kwargs + ) circ_copy = circuit.copy(True) - circ_copy.add(list(measurements.values())) + # circ_copy.add(list(measurements.values())) + circ_copy.add(measurement_gate) rotated_circuits.append(circ_copy) # execute the circuits diff --git a/src/qibo/error_mitigation/__init__.py b/src/qibo/error_mitigation/__init__.py new file mode 100644 index 0000000000..3355116b89 --- /dev/null +++ b/src/qibo/error_mitigation/__init__.py @@ -0,0 +1,7 @@ +from qibo.error_mitigation.cdr import CDR +from qibo.error_mitigation.inverse_response_matrix import InverseResponseMatrix +from qibo.error_mitigation.iterative_bayesian_unfolding import ( + IterativeBayesianUnfolding, +) +from qibo.error_mitigation.vncdr import VNCDR +from qibo.error_mitigation.zne import ZNE diff --git a/src/qibo/error_mitigation/abstract.py b/src/qibo/error_mitigation/abstract.py new file mode 100644 index 0000000000..e77434bd7a --- /dev/null +++ b/src/qibo/error_mitigation/abstract.py @@ -0,0 +1,296 @@ +"""Abstract Error Mitigation Routine object""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from functools import cached_property +from types import NoneType +from typing import Callable, Dict, Iterable, List, Optional, Union + +import numpy as np +from scipy.optimize import curve_fit + +from qibo import Circuit +from qibo.backends import construct_backend, get_transpiler +from qibo.backends.abstract import Backend +from qibo.config import raise_error +from qibo.hamiltonians.abstract import AbstractHamiltonian +from qibo.measurements import MeasurementResult +from qibo.noise import NoiseModel +from qibo.transpiler import Passes + + +@dataclass +class ReadoutMitigationRoutine(ABC): + + nshots: Optional[int] = None + _backend: Union[Backend, NoneType] = None + _noise_model: NoiseModel = None + _nqubits: int = None + + @property + def backend(self): + if self._backend is None: + raise_error(RuntimeError, "Backend not initialized yet.") + return self._backend + + @backend.setter + def backend(self, new_backend: Backend): + self._backend = new_backend + + @property + def noise_model(self): + if self._noise_model is None: + raise_error(RuntimeError, "NoiseModel not initialized yet.") + return self._noise_model + + @property + def nqubits(self): + if self._nqubits is None: + raise_error(RuntimeError, "nqubits not initialized yet.") + return self._nqubits + + @abstractmethod + def __call__(self, frequencies: Dict[int | str, int]) -> Dict[int | str, int]: + pass + + @staticmethod + def binary_to_integer_keys( + frequencies: Dict[str, int | float], + ) -> Dict[int, int | float]: + return {int(key, 2): value for key, value in frequencies.items()} + + @staticmethod + def integer_to_binary_keys( + frequencies: Dict[int, int | float], nqubits: int + ) -> Dict[str, int | float]: + return {f"{i:0{nqubits}b}": value for i, value in enumerate(frequencies)} + + +class MitigatedMeasurementResult(MeasurementResult): + pass + + +@dataclass +class ErrorMitigationRoutine(ABC): + + circuit: Optional[Circuit] = None + observable: Optional[AbstractHamiltonian] = None + noise_model: Optional[NoiseModel] = None + transpiler: Optional[Passes] = None + readout_mitigation: Optional[ReadoutMitigationRoutine] = None + + def __post_init__( + self, + ): + if self.transpiler is None: + self.transpiler = get_transpiler() + if self.readout_mitigation is not None: + self.readout_mitigation._noise_model = self.noise_model + self.readout_mitigation._backend = self.backend + + @property + def backend(self): + if self.observable is not None: + return self.observable.backend + raise_error(RuntimeError, "No observable defined yet, no backend available.") + + def _circuit(self, circuit: Circuit) -> Circuit: + if circuit is None: + if self.circuit is None: + raise_error( + RuntimeError, + "No circuit provided, please either initialize the the mitigation routine with a circuit, or provide it upon call.", + ) + return self.circuit + return circuit + + def _noise_model( + self, noise_model: Union[NoiseModel, NoneType] + ) -> Union[NoiseModel, NoneType]: + if noise_model is None: + return self.noise_model + return noise_model + + def _observable( + self, observable: Union[AbstractHamiltonian, NoneType] + ) -> Union[AbstractHamiltonian, NoneType]: + if observable is None: + if self.observable is None: + raise_error( + RuntimeError, + "No observable provided, please either initialize the the mitigation routine with an observable, or provide it upon call.", + ) + return self.observable + # this is mostly useful to have a consistent backend available + self.observable = observable + return observable + + @abstractmethod + def __call__( + self, + circuit: Circuit, + observable: AbstractHamiltonian, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + pass + + def _circuit_preprocessing( + self, circuits: List[Circuit], noise_model: Optional[NoiseModel] = None + ) -> List[Circuit]: + noise = self._noise_model(noise_model) + new_circuits = [] + for circ in circuits: + if noise is not None: + circ = noise.apply(circ) + circ, _ = self.transpiler(circ) + new_circuits.append(circ) + return new_circuits + + +@dataclass +class DataRegressionErrorMitigation(ErrorMitigationRoutine): + + n_training_samples: Optional[int] = 50 + regression_model: Optional[Callable] = None + model_parameters: Optional[Iterable[float]] = None + simulation_backend: Optional[Backend] = None + _is_trained: bool = False + + def __post_init__(self): + super().__post_init__() + if self.simulation_backend is None: + self.simulation_backend = construct_backend("numpy") + + @abstractmethod + def sample_circuit(self, circuit: Optional[Circuit] = None) -> Circuit: + pass + + def _training_circuits(self, circuit: Optional[Circuit] = None) -> List[Circuit]: + if circuit is None: + return self.training_circuits + return [self.sample_circuit(circuit) for _ in range(self.n_training_samples)] + + @cached_property + def training_circuits(self) -> List[Circuit]: + return [ + self.sample_circuit(self.circuit) for _ in range(self.n_training_samples) + ] + + @property + def is_trained(self) -> bool: + return self._is_trained + + @is_trained.setter + def is_trained(self, trained: bool): + self._is_trained = trained + + @property + def xdata_shape(self): + return (self.n_training_samples,) + + @staticmethod + def _cast_parameters( + circuits: List[Circuit], src_backend: Backend, target_backend: Backend + ): + for circ in circuits: + for gate in circ.parametrized_gates: + params = src_backend.to_numpy( + src_backend.cast(gate.parameters, dtype=src_backend.np.float64) + ) + gate.parameters = target_backend.cast( + params, dtype=target_backend.np.float64 + ) + + def regression( + self, + training_circuits: Optional[List[Circuit]] = None, + observable: Optional[AbstractHamiltonian] = None, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + if training_circuits is None: + training_circuits = self.training_circuits + + observable = self._observable(observable) + + # first get the noisy expectations running with the current backend + noisy_circuits = self._circuit_preprocessing(training_circuits, noise_model) + noisy_expectations = observable.backend.cast( + [ + observable.expectation(circuit, nshots=nshots) + for circuit in noisy_circuits + ], + dtype=observable.backend.np.float64, + ) + # cast to numpy + noisy_expectations = self.backend.to_numpy(noisy_expectations) + noisy_expectations = np.reshape(noisy_expectations, self.xdata_shape) + + # then switch to the simulation backend to get the exact values + original_backend = observable.backend + observable.backend = self.simulation_backend + # select the subset of circuits relevant for exact expectations + N = self.xdata_shape[-1] if len(self.xdata_shape) > 1 else 1 + training_circuits = [ + training_circuits[i] for i in N * np.arange(noisy_expectations.shape[0]) + ] + # cast circuits parameters to the simulation backend arrays + self._cast_parameters( + training_circuits, original_backend, self.simulation_backend + ) + exact_expectations = self.simulation_backend.cast( + [ + observable.expectation(circuit, nshots=nshots) + for circuit in training_circuits + ], + dtype=self.simulation_backend.np.float64, + ) + # cast to numpy + exact_expectations = self.simulation_backend.to_numpy(exact_expectations) + # restore the original backend + observable.backend = original_backend + + # do the regression + optimal_params, params_cov = curve_fit( + self.regression_model, + noisy_expectations, + exact_expectations, + self.model_parameters, + ) + return optimal_params + + def _apply_model( + self, + circuit: Optional[Circuit] = None, + observable: Optional[AbstractHamiltonian] = None, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + circuit = self._circuit(circuit) + circuit = self._circuit_preprocessing([circuit], noise_model)[0] + observable = self._observable(observable) + noisy_exp_val = observable.expectation(circuit, nshots=nshots) + params = self.backend.cast(self.model_parameters, dtype=noisy_exp_val.dtype) + return self.regression_model(noisy_exp_val, *params) + + def __call__( + self, + circuit: Optional[Circuit] = None, + observable: Optional[AbstractHamiltonian] = None, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + training_circuits = self._training_circuits(circuit) + # if something is different retrain the regression model + if ( + circuit is not None + or observable is not None + or noise_model is not None + or not self.is_trained + ): + self.model_parameters = self.regression( + training_circuits, observable, nshots, noise_model + ) + self.is_trained = True + return self._apply_model(circuit, observable, nshots, noise_model) diff --git a/src/qibo/error_mitigation/cdr.py b/src/qibo/error_mitigation/cdr.py new file mode 100644 index 0000000000..72533a2452 --- /dev/null +++ b/src/qibo/error_mitigation/cdr.py @@ -0,0 +1,118 @@ +import copy +import inspect +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +import numpy as np + +from qibo import Circuit, gates +from qibo.backends.numpy import NumpyBackend +from qibo.config import raise_error +from qibo.error_mitigation.abstract import DataRegressionErrorMitigation +from qibo.gates.abstract import Gate, ParametrizedGate + +NPBACKEND = NumpyBackend() + + +@dataclass +class CDR(DataRegressionErrorMitigation): + + replacement_gates: Optional[Tuple[Gate, List[Dict]]] = None + _targeted_gate: Gate = None + + def __post_init__(self): + super().__post_init__() + # setup the regression model + self._regression_model_setup() + + # setup the replacement gates + if self.replacement_gates is None: + self.replacement_gates = ( + gates.RZ, + [{"theta": n * np.pi / 2} for n in range(4)], + ) + self._targeted_gate = self.replacement_gates[0] + if not all( + self._targeted_gate(0, **kwargs).clifford + for kwargs in self.replacement_gates[1] + ): + raise_error( + ValueError, + f"The provided set of gates for replacement: {self.replacement_gates} include one or more non clifford gates.", + ) + + def _regression_model_setup(self): + if self.regression_model is None: + self.regression_model = lambda x, a, b: a * x + b + if self.model_parameters is None: + self.model_parameters = np.array([1, 0], dtype=np.float64) + elif self.model_parameters is None: + nparams = len(inspect.signature(self.regression_model).parameters) - 1 + self.model_parameters = np.random.randn(nparams).astype(np.float64) + + def sample_circuit( + self, circuit: Optional[Circuit] = None, sigma: float = 0.5 + ) -> Circuit: + circuit = self._circuit(circuit) + + gates_to_replace = [] + for i, gate in enumerate(circuit.queue): + if isinstance(gate, self._targeted_gate): + if not gate.clifford: + gates_to_replace.append((i, gate)) + if len(gates_to_replace) == 0: + raise_error(RuntimeError, "No non-Clifford gate found, no circuit sampled.") + + replacement, distance = [], [] + for _, gate in gates_to_replace: + rep_gates = np.array( + [ + self._targeted_gate(*gate.init_args, **kwargs) + for kwargs in self.replacement_gates[1] + ] + ) + + replacement.append(rep_gates) + + if isinstance(gate, ParametrizedGate): + gate = copy.deepcopy(gate) + gate.parameters = NPBACKEND.cast( + [self.backend.to_numpy(p) for p in gate.parameters], + dtype=np.float64, + ) + gate_matrix = gate.matrix(NPBACKEND) + rep_gate_matrix = [rep_gate.matrix(NPBACKEND) for rep_gate in rep_gates] + rep_gate_matrix = NPBACKEND.cast( + rep_gate_matrix, dtype=rep_gate_matrix[0].dtype + ) + matrix_norm = NPBACKEND.np.linalg.norm( + gate_matrix - rep_gate_matrix, ord="fro", axis=(1, 2) + ) + distance.append(NPBACKEND.np.real(matrix_norm)) + + distance = np.vstack(distance) + prob = np.exp(-(distance**2) / sigma**2) + + index = np.random.choice( + range(len(gates_to_replace)), + size=min(int(len(gates_to_replace) / 2), 50), + replace=False, + p=np.sum(prob, -1) / np.sum(prob), + ) + + gates_to_replace = np.array([gates_to_replace[i] for i in index]) + prob = [prob[i] for i in index] + prob = NPBACKEND.cast(prob, dtype=prob[0].dtype) + + replacement = np.array([replacement[i] for i in index]) + replacement = [ + replacement[i][np.random.choice(range(len(p)), size=1, p=p / np.sum(p))[0]] + for i, p in enumerate(prob) + ] + replacement = {i[0]: g for i, g in zip(gates_to_replace, replacement)} + + sampled_circuit = circuit.__class__(**circuit.init_kwargs) + for i, gate in enumerate(circuit.queue): + sampled_circuit.add(replacement.get(i, gate)) + + return sampled_circuit diff --git a/src/qibo/error_mitigation/ics.py b/src/qibo/error_mitigation/ics.py new file mode 100644 index 0000000000..1a54b0c739 --- /dev/null +++ b/src/qibo/error_mitigation/ics.py @@ -0,0 +1,11 @@ +from qibo.error_mitigation.cdr import CDR + + +class ICS(CDR): + + def sample_circuit(self, circuit: Optional[Circuit] = None) -> Circuit: + sampled_circuit = super().sample_circuit(circuit) + symplectic_matrix = self.simulation_backend.execute_circuit( + sampled_circuit.invert(), nshots=1 + ).symplectic_matrix[:-1, :-1] + raise NotImplementedError diff --git a/src/qibo/error_mitigation/inverse_response_matrix.py b/src/qibo/error_mitigation/inverse_response_matrix.py new file mode 100644 index 0000000000..9a93a55173 --- /dev/null +++ b/src/qibo/error_mitigation/inverse_response_matrix.py @@ -0,0 +1,68 @@ +from functools import cached_property +from typing import Dict, List + +from numpy.typing import ArrayLike + +from qibo import Circuit, gates +from qibo.error_mitigation.abstract import ReadoutMitigationRoutine + + +class InverseResponseMatrix(ReadoutMitigationRoutine): + + _circuit: Circuit = None + + @cached_property + def response_circuits(self) -> List[Circuit]: + circuits = [] + for i in range(2**self.nqubits): + binary_state = format(i, f"0{self.nqubits}b") + circuit = Circuit(self.nqubits, density_matrix=True) + for qubit, bit in enumerate(binary_state): + if bit == "1": + circuit.add(gates.X(qubit)) + circuit.add(gates.M(*range(self.nqubits))) + circuits.append(self.noise_model.apply(circuit)) + return circuits + + @cached_property + def response_matrix(self): + response_matrix = self.backend.np.zeros((2**self.nqubits, 2**self.nqubits)) + circuit_results = self.backend.execute_circuits( + self.response_circuits, nshots=self.nshots + ) + for i, result in enumerate(circuit_results): + frequencies = result.frequencies() + column = self.backend.np.zeros(2**self.nqubits) + for key, value in frequencies.items(): + column[int(key, 2)] = value / self.nshots + response_matrix[:, i] = column + return response_matrix + + @cached_property + def inverse_response_matrix(self): + return self.backend.np.linalg.inv(self.response_matrix) + + def _frequencies_to_array(self, frequencies: Dict[int, int | float]) -> ArrayLike: + array = self.backend.np.zeros(2**self.nqubits) + for key, value in frequencies.items(): + array[key] = value + return self.backend.np.reshape(array, (-1, 1)) + + def mitigate_frequencies(self, frequencies: ArrayLike) -> ArrayLike: + return self.inverse_response_matrix @ frequencies + + def __call__(self, frequencies: Dict[int | str, int]) -> Dict[int | str, float]: + if self.nshots is None: + self.nshots = sum(tuple(frequencies.values())) + is_key_integer = isinstance(tuple(frequencies)[0], int) + # convert keys to integer representation + if not is_key_integer: + frequencies = self.binary_to_integer_keys(frequencies) + frequencies = self._frequencies_to_array(frequencies) + mitigated_frequencies = self.mitigate_frequencies(frequencies) + mitigated_frequencies = ( + self.backend.to_numpy(mitigated_frequencies).ravel().tolist() + ) + if not is_key_integer: + return self.integer_to_binary_keys(mitigated_frequencies, self.nqubits) + return {i: value for i, value in enumerate(mitigated_frequencies)} diff --git a/src/qibo/error_mitigation/iterative_bayesian_unfolding.py b/src/qibo/error_mitigation/iterative_bayesian_unfolding.py new file mode 100644 index 0000000000..f5291c20e2 --- /dev/null +++ b/src/qibo/error_mitigation/iterative_bayesian_unfolding.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass +from functools import cached_property + +from numpy.typing import ArrayLike + +from qibo.error_mitigation.inverse_response_matrix import InverseResponseMatrix + + +@dataclass +class IterativeBayesianUnfolding(InverseResponseMatrix): + + iterations: int = 10 + + @cached_property + def transposed_inverse_response_matrix(self): + return self.backend.np.transpose(self.inverse_response_matrix, (1, 0)) + + def unfold(self, probabilities: ArrayLike) -> ArrayLike: + N = len(probabilities) + unfolded_probabilities = ( + self.backend.np.ones((N, 1), dtype=probabilities.dtype) / N + ) + for _ in range(self.iterations): + unfolded_probabilities *= self.transposed_inverse_response_matrix @ ( + probabilities / (self.inverse_response_matrix @ unfolded_probabilities) + ) + return unfolded_probabilities + + def mitigate_frequencies(self, frequencies: ArrayLike) -> ArrayLike: + probabilities = frequencies / sum(frequencies) + mitigated_probabilities = self.unfold(probabilities) + mitigated_frequencies = self.backend.np.round( + mitigated_probabilities * sum(frequencies), decimals=0 + ) + mitigated_frequencies = ( + mitigated_frequencies / sum(mitigated_frequencies) + ) * sum(frequencies) + return mitigated_frequencies diff --git a/src/qibo/error_mitigation/mitigator.py b/src/qibo/error_mitigation/mitigator.py new file mode 100644 index 0000000000..003913258d --- /dev/null +++ b/src/qibo/error_mitigation/mitigator.py @@ -0,0 +1,29 @@ +"""Error mitigation object""" + +from dataclasses import dataclass +from typing import List, Optional + +from qibo import Circuit +from qibo.backends.abstract import Backend +from qibo.hamiltonians.abstract import AbstractHamiltonian +from qibo.noise import NoiseModel +from qibo.transpiler import Passes + + +@dataclass +class Mitigator: + + passes: List + transpiler: Optional[Passes] = None + backend: Optional[Backend] = None + + def __call__( + self, + circuit: Circuit, + observable: AbstractHamiltonian, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + for routine in self.passes: + circuit, exp_val = routine(circuit, observable, nshots, noise_model) + return exp_val diff --git a/src/qibo/error_mitigation/mitiq.py b/src/qibo/error_mitigation/mitiq.py new file mode 100644 index 0000000000..45eabdfb07 --- /dev/null +++ b/src/qibo/error_mitigation/mitiq.py @@ -0,0 +1,72 @@ +from dataclasses import dataclass +from types import NoneType +from typing import Callable, Dict, Optional, Union + +import mitiq + +from qibo.config import raise_error +from qibo.error_mitigation.abstract import ( + ErrorMitigationRoutine, + ReadoutMitigationRoutine, +) +from qibo.hamiltonians import SymbolicHamiltonian +from qibo.models.circuit import Circuit +from qibo.noise import NoiseModel + + +@dataclass +class Mitiq(ErrorMitigationRoutine): + + method: str = None + method_kwargs: Optional[Dict] = None + _run_method: str = None + + def __post_init__(self): + if self.method is None: + raise_error(RuntimeError, "Missing mandatory `method` argument.") + self._run_method = f"run_with_{method}" + self.method = getattr(mitiq, self.method) + if self.method_kwargs is None: + self.method_kwargs = {} + + @property + def _run(self) -> Callable: + return getattr(self.method, self._run_method) + + def _qibo_observable_to_mitiq( + self, observable: SymbolicHamiltonian + ) -> mitiq.Observable: + pauli_strings = [] + for coefficient, pauli, qubits in zip(*observable.simple_terms): + pauli_string = "" + for q in range(max(qubits)): + if q in qubits: + index = qubits.index(q) + pauli_string += pauli[index] + else: + pauli_string += "I" + pauli_strings.append( + mitiq.PauliString(spec=pauli_string, coeff=coefficient) + ) + return mitiq.Observable(*pauli_strings) + + def _observable( + self, observable: Union[SymbolicHamiltonian, NoneType] + ) -> Union[SymbolicHamiltonian, NoneType]: + observable = super()._observable(observable) + return self._qibo_observable_to_mitiq(observable) + + def __call__( + self, + circuit: Circuit, + observable: SymbolicHamiltonian, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + **mitiq_kwargs, + ): + circuit = self._circuit_preprocessing([circuit], noise_model)[0] + observable = self._observable(observable) + kwargs = self.method_kwargs.update(mitiq_kwargs) + if observable is not None: + kwargs["observable"] = observable + return self._run(circuit, **kwargs) diff --git a/src/qibo/error_mitigation/randomized_readout.py b/src/qibo/error_mitigation/randomized_readout.py new file mode 100644 index 0000000000..ef1f36de45 --- /dev/null +++ b/src/qibo/error_mitigation/randomized_readout.py @@ -0,0 +1,80 @@ +from dataclasses import dataclass +from functools import cached_property +from typing import Dict, List, Optional, Tuple + +from qibo import gates +from qibo.error_mitigation.abstract import ReadoutMitigationRoutine +from qibo.models import circuit +from qibo.models.circuit import Circuit +from qibo.quantum_info.random_ensembles import random_pauli + + +@dataclass +class RandomizedReadout(ReadoutMitigationRoutine): + + circuit: Optional[Circuit] = None + n_circuits: int = 10 + + @staticmethod + def _sample_X_pauli( + nqubits: int, seed: Optional[int] = None + ) -> Tuple[Circuit, Dict[int, int]]: + x_gate = random_pauli( + nqubits, 1, subset=["I", "X"], return_circuit=True, seed=seed + ) + error_map = { + gate.qubits[0]: 1 for gate in x_gate.queue if isinstance(gate, gates.X) + } + return x_gate, error_map + + @cached_property + def X_paulis(self) -> Tuple[List[Circuit], List[Dict[int, int]]]: + x_paulis, error_maps = zip( + *[self._sample_X_pauli(self.nqubits) for _ in range(self.n_circuits)] + ) + return x_paulis, error_maps + + @staticmethod + def measurements_layer( + nqubits: int, error_map: Optional[Dict[int, int]] = None, **circ_kwargs + ) -> Circuit: + meas_circ = Circuit(nqubits, **circ_kwargs) + meas_circ.add(gates.M(*range(nqubits), p0=error_map)) + return meas_circ + + @cached_property + def calibration_circuits(self) -> Tuple[List[Circuit], List[Circuit]]: + empty_circuits = [ + x_pauli + self.measurements_layer(self.nqubits, error_map=err_map) + for x_pauli, err_map in self.X_paulis + ] + circuits = [ + self.circuit.copy(deep=True) + + self.measurements_layer(self.nqubits, error_map=err_map) + for _, err_map in self.X_paulis + ] + return empty_circuits, circuits + + def _sample_calibration_circuit( + self, + circuit: Optional[Circuit] = None, + seed: Optional[int] = None, + ) -> Tuple[Circuit, Dict[int, int]]: + x_pauli, error_map = self._sample_X_pauli(circuit.nqubits, seed) + return ( + circuit.copy(deep=True) + + x_pauli + + self.measurements_layer(circuit.nqubits), + error_map, + ) + + def lam(self, nshots): + frequencies = [ + result.frequencies(binary=False) + for result in self.backend.execute_circuits( + sum(*self.calibration_circuits), nshots=nshots + ) + ] + + def __call__(self, frequencies: Dict[int | str, int]) -> Dict[int | str, int]: + pass diff --git a/src/qibo/error_mitigation/vncdr.py b/src/qibo/error_mitigation/vncdr.py new file mode 100644 index 0000000000..f533a3effb --- /dev/null +++ b/src/qibo/error_mitigation/vncdr.py @@ -0,0 +1,89 @@ +from dataclasses import dataclass +from functools import cached_property +from typing import List, Optional + +import numpy as np + +from qibo.config import raise_error +from qibo.error_mitigation.cdr import CDR +from qibo.error_mitigation.zne import ZNE +from qibo.hamiltonians.abstract import AbstractHamiltonian +from qibo.models.circuit import Circuit +from qibo.noise import NoiseModel + + +@dataclass +class VNCDR(ZNE, CDR): + + def __post_init__(self): + super().__post_init__() + + def _regression_model_setup(self): + if self.regression_model is None: + self.regression_model = lambda x, *params: sum( + [xx * p for xx, p in zip(x.T, params)] + ) # lambda x, *params: x @ np.asarray(params) + if self.model_parameters is None: + self.model_parameters = np.ones( + (len(self.noise_levels),), dtype=np.float64 + ) + elif self.model_parameters is None: + raise_error( + ValueError, + "Please provide also the parameters through the `model_parameters` argument when using a custom regression model.", + ) + + @property + def xdata_shape(self): + return (-1, len(self.noise_levels)) + + def _training_circuits(self, circuit: Optional[Circuit] = None) -> List[Circuit]: + if circuit is None: + return self.training_circuits + circuits = [] + for _ in range(self.n_training_samples): + circ = self.sample_circuit(circuit) + noisier_circuits = [ + self.build_noisy_circuit(circ, level) for level in self.noise_levels + ] + circuits.extend(noisier_circuits) + return circuits + + @cached_property + def training_circuits(self) -> List[Circuit]: + circuits = [] + for _ in range(self.n_training_samples): + circ = self.sample_circuit(self.circuit) + noisier_circuits = [ + self.build_noisy_circuit(circ, level) for level in self.noise_levels + ] + circuits.extend(noisier_circuits) + return circuits + + def _apply_model( + self, + circuit: Optional[Circuit] = None, + observable: Optional[AbstractHamiltonian] = None, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + noisy_circuits = self._noisy_circuits(circuit) + noisy_circuits = self._circuit_preprocessing(noisy_circuits, noise_model) + observable = self._observable(observable) + noisy_exp_vals = [ + observable.expectation(circ, nshots=nshots) for circ in noisy_circuits + ] + noisy_exp_vals = self.backend.cast( + noisy_exp_vals, dtype=noisy_exp_vals[0].dtype + ) + params = self.backend.cast(self.model_parameters, dtype=noisy_exp_vals.dtype) + return self.regression_model(noisy_exp_vals, *params) + + def __call__( + self, + circuit: Optional[Circuit] = None, + observable: Optional[AbstractHamiltonian] = None, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + return CDR.__call__(self, circuit, observable, nshots, noise_model) diff --git a/src/qibo/error_mitigation/zne.py b/src/qibo/error_mitigation/zne.py new file mode 100644 index 0000000000..8f93779645 --- /dev/null +++ b/src/qibo/error_mitigation/zne.py @@ -0,0 +1,148 @@ +import math +from dataclasses import dataclass +from functools import cache, cached_property +from typing import Iterable, List, Optional, Tuple + +import numpy as np + +from qibo import Circuit, gates +from qibo.config import raise_error +from qibo.error_mitigation.abstract import ErrorMitigationRoutine +from qibo.hamiltonians.abstract import AbstractHamiltonian +from qibo.noise import NoiseModel + + +@dataclass +class ZNE(ErrorMitigationRoutine): + + noise_levels: Iterable[int] = tuple(range(4)) + insertion_gate: str = "CNOT" + global_unitary_folding: bool = True + + def __post_init__(self): + super().__post_init__() + self.insertion_gate = getattr(gates, self.insertion_gate) + if not self.insertion_gate in (gates.RX, gates.CNOT): + raise_error( + ValueError, + f"Gate {self.insertion_gate} is not supported, please use either `RX` or `CNOT`.", + ) + + @staticmethod + @cache + def gammas(noise_levels: Tuple[int, ...], analytical: bool = True) -> np.ndarray: + """Standalone function to compute the ZNE coefficients given the noise levels. + + Args: + noise_levels (numpy.ndarray): array containing the different noise levels. + Note that in the CNOT insertion paradigm this corresponds to + the number of CNOT pairs to be inserted. The canonical ZNE + noise levels are obtained as ``2 * c + 1``. + analytical (bool, optional): if ``True``, computes the coeffients by solving the + linear system. If ``False``, use the analytical solution valid + for the CNOT insertion method. Default is ``True``. + + Returns: + numpy.ndarray: the computed coefficients. + """ + noise_levels = np.asarray(noise_levels) + if analytical: + noise_levels = 2 * noise_levels + 1 + a_matrix = np.array([noise_levels**i for i in range(len(noise_levels))]) + b_vector = np.zeros(len(noise_levels)) + b_vector[0] = 1 + zne_coefficients = np.linalg.solve(a_matrix, b_vector) + else: + max_noise_level = noise_levels[-1] + zne_coefficients = np.array( + [ + 1 + / (2 ** (2 * max_noise_level) * math.factorial(i)) + * (-1) ** i + / (1 + 2 * i) + * math.factorial(1 + 2 * max_noise_level) + / ( + math.factorial(max_noise_level) + * math.factorial(max_noise_level - i) + ) + for i in noise_levels + ] + ) + return zne_coefficients + + def build_noisy_circuit( + self, circuit: Optional[Circuit] = None, num_insertions: int = 1 + ) -> Circuit: + circuit = self._circuit(circuit) + + if self.global_unitary_folding: + copy_c = circuit.copy(deep=True) + noisy_circuit = copy_c + for _ in range(num_insertions): + noisy_circuit += copy_c.invert() + copy_c + return noisy_circuit + + if ( + self.insertion_gate is gates.CNOT and circuit.nqubits < 2 + ): # pragma: no cover + raise_error( + ValueError, + "Provide a circuit with at least 2 qubits when using the 'CNOT' insertion gate. " + + "Alternatively, try with the 'RX' insertion gate instead.", + ) + + # theta = np.pi / 2 # this or the actual angle?? + noisy_circuit = Circuit(**circuit.init_kwargs) + + for gate in circuit.queue: + noisy_circuit.add(gate) + if isinstance(gate, self.insertion_gate): + gate_kwargs_1 = gate_kwargs_2 = {} + if gate is gates.RX: + gate_kwargs_1 = {"theta": gate.init_kwargs["theta"]} + gate_kwargs_2 = {"theta": -gate.init_kwargs["theta"]} + for _ in range(num_insertions): + noisy_circuit.add( + self.insertion_gate(*gate.qubits, **gate_kwargs_1) + ) + noisy_circuit.add( + self.insertion_gate(*gate.qubits, **gate_kwargs_2) + ) + return noisy_circuit + + def _noisy_circuits(self, circuit: Optional[Circuit] = None): + if circuit is None: + return self.noisy_circuits + return [ + self.build_noisy_circuit(circuit, num_insertions) + for num_insertions in self.noise_levels + ] + + @cached_property + def noisy_circuits(self) -> List[Circuit]: + return [ + self.build_noisy_circuit(self.circuit, num_insertions) + for num_insertions in self.noise_levels + ] + + def __call__( + self, + circuit: Optional[Circuit] = None, + observable: Optional[AbstractHamiltonian] = None, + nshots: Optional[int] = None, + noise_model: Optional[NoiseModel] = None, + ): + noisy_circuits = self._noisy_circuits(circuit) + noisy_circuits = self._circuit_preprocessing(noisy_circuits, noise_model) + observable = self._observable(observable) + exp_vals = [ + observable.expectation( + circ, nshots=nshots, readout_mitigation=self.readout_mitigation + ) + for circ in noisy_circuits + ] + exp_vals = self.backend.np.stack(exp_vals) + gammas = self.backend.cast( + self.gammas(self.noise_levels), dtype=exp_vals[0].dtype + ) + return sum(gammas * exp_vals) diff --git a/src/qibo/gates/measurements.py b/src/qibo/gates/measurements.py index a5fb14e2b8..54aaaed403 100644 --- a/src/qibo/gates/measurements.py +++ b/src/qibo/gates/measurements.py @@ -50,6 +50,7 @@ def __init__( basis: Union[Gate, str] = Z, p0: Optional["ProbsType"] = None, # type: ignore p1: Optional["ProbsType"] = None, # type: ignore + readout_mitigation: Optional["ReadoutMitigationRoutine"] = None, ): super().__init__() self.name = "measure" @@ -58,6 +59,9 @@ def __init__( self.register_name = register_name self.collapse = collapse self.result = MeasurementResult(self.target_qubits) + self.readout_mitigation = readout_mitigation + if self.readout_mitigation is not None: + self.readout_mitigation._nqubits = len(self.target_qubits) # list of measurement pulses implementing the gate # relevant for experiments only self.pulses = None @@ -82,6 +86,7 @@ def __init__( "basis": [g.__name__ for g in self.basis_gates], "p0": p0, "p1": p1, + "readout_mitigation": readout_mitigation, } if collapse: if p0 is not None or p1 is not None: diff --git a/src/qibo/hamiltonians/hamiltonians.py b/src/qibo/hamiltonians/hamiltonians.py index 669929daad..f0d81a398a 100644 --- a/src/qibo/hamiltonians/hamiltonians.py +++ b/src/qibo/hamiltonians/hamiltonians.py @@ -593,7 +593,7 @@ def calculate_dense(self) -> Hamiltonian: # costly ``sympy.expand`` call return self._calculate_dense_from_form() - def expectation(self, circuit: "Circuit", nshots: Optional[int] = None) -> float: # type: ignore + def expectation(self, circuit: "Circuit", nshots: Optional[int] = None, **measurements_kwargs) -> float: # type: ignore """Computes the expectation value for a given circuit. Args: @@ -620,7 +620,11 @@ def expectation(self, circuit: "Circuit", nshots: Optional[int] = None) -> float return self.dense.expectation(circuit) terms_coefficients, terms, term_qubits = self.simple_terms return self.constant.real + self.backend.expectation_observable_symbolic( - circuit, terms, term_qubits, terms_coefficients, self.nqubits + circuit, + terms, + term_qubits, + terms_coefficients, + self.nqubits, ) terms_coefficients, terms_observables, terms_qubits = self.diagonal_simple_terms @@ -632,6 +636,7 @@ def expectation(self, circuit: "Circuit", nshots: Optional[int] = None) -> float self.nqubits, self.constant.real, nshots, + **measurements_kwargs, ) def expectation_from_samples( diff --git a/src/qibo/measurements.py b/src/qibo/measurements.py index 39390597d1..71d3c2cfff 100644 --- a/src/qibo/measurements.py +++ b/src/qibo/measurements.py @@ -1,9 +1,7 @@ import collections -import numpy as np import sympy -from qibo import gates from qibo.config import raise_error diff --git a/src/qibo/models/circuit.py b/src/qibo/models/circuit.py index 61226da9af..b8f6b17918 100644 --- a/src/qibo/models/circuit.py +++ b/src/qibo/models/circuit.py @@ -649,12 +649,23 @@ def add(self, gate): # all the gates in the basis of the measure gates should not # be added to the new circuit, otherwise once the measure gate # is added in the circuit there will be two of the same. + measurement_circuit = Circuit(**self.init_kwargs) for base in gate.basis: if base not in self.queue: - self.add(base) + measurement_circuit.add(base) + gate.basis = [] + gate.basis_gates = [gates.Z] + measurement_circuit.queue.append(gate) + if ( + gate.readout_mitigation is not None + and gate.readout_mitigation.noise_model is not None + ): + measurement_circuit = gate.readout_mitigation.noise_model.apply( + measurement_circuit + ) + self.queue += measurement_circuit.queue - self.queue.append(gate) if gate.register_name is None: # add default register name nreg = self.queue.nmeasurements - 1 diff --git a/src/qibo/models/error_mitigation.py b/src/qibo/models/error_mitigation.py index 6328240dcc..bf2ac3a945 100644 --- a/src/qibo/models/error_mitigation.py +++ b/src/qibo/models/error_mitigation.py @@ -425,7 +425,6 @@ def CDR( `arXiv:2005.10189 [quant-ph] `_. """ backend, local_state = _check_backend_and_local_state(seed, backend) - if readout is None: readout = {} diff --git a/src/qibo/noise.py b/src/qibo/noise.py index 6ceec82ced..77375816db 100644 --- a/src/qibo/noise.py +++ b/src/qibo/noise.py @@ -313,6 +313,10 @@ def apply(self, circuit): noisy_circuit = circuit.__class__(**circuit.init_kwargs) for gate in circuit.queue: + readout_mitigation = None + if isinstance(gate, gates.M) and gate.readout_mitigation is not None: + readout_mitigation = gate.readout_mitigation + gate.readout_mitigation = None errors_list = ( self.errors[gate.__class__] if isinstance(gate, (gates.Channel, gates.M)) @@ -370,6 +374,9 @@ def apply(self, circuit): ): noisy_circuit.add(gate) + if isinstance(gate, gates.M): + gate.readout_mitigation = readout_mitigation + return noisy_circuit diff --git a/src/qibo/result.py b/src/qibo/result.py index 696006a9c4..51939eb339 100644 --- a/src/qibo/result.py +++ b/src/qibo/result.py @@ -222,23 +222,25 @@ def frequencies(self, binary: bool = True, registers: bool = False): if self._repeated_execution_frequencies is not None: if binary: - return self._repeated_execution_frequencies - - return collections.Counter( + frequencies = self._repeated_execution_frequencies + frequencies = collections.Counter( {int(k, 2): v for k, v in self._repeated_execution_frequencies.items()} ) + if self.measurement_gate.readout_mitigation is not None: + frequencies = self.measurement_gate.readout_mitigation(frequencies) + return frequencies if self._frequencies is None: if self.measurement_gate.has_bitflip_noise() and not self.has_samples(): self._samples = self.samples() if not self.has_samples(): # generate new frequencies - self._frequencies = self.backend.sample_frequencies( - self._probs, self.nshots - ) + frequencies = self.backend.sample_frequencies(self._probs, self.nshots) + if self.measurement_gate.readout_mitigation is not None: + frequencies = self.measurement_gate.readout_mitigation(frequencies) + self._frequencies = frequencies # register frequencies to individual gate ``MeasurementResult`` qubit_map = {q: i for i, q in enumerate(qubits)} - reg_frequencies = {} binary_frequencies = frequencies_to_binary( self._frequencies, len(qubits) ) @@ -256,7 +258,10 @@ def frequencies(self, binary: bool = True, registers: bool = False): self._frequencies = self.backend.calculate_frequencies( self.samples(binary=False) ) - + if self.measurement_gate.readout_mitigation is not None: + self._frequencies = self.measurement_gate.readout_mitigation( + self._frequencies + ) if registers: return { gate.register_name: gate.result.frequencies(binary)